1 /*
2    Copyright (c) 2016, 2017 MariaDB
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
16 
17 #include "mariadb.h"
18 #include "sql_parse.h"
19 #include "sql_select.h"
20 #include "sql_list.h"
21 #include "item_windowfunc.h"
22 #include "filesort.h"
23 #include "sql_base.h"
24 #include "sql_window.h"
25 
26 
27 bool
check_window_names(List_iterator_fast<Window_spec> & it)28 Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
29 {
30   if (window_names_are_checked)
31     return false;
32   const char *name= this->name();
33   const char *ref_name= window_reference();
34   it.rewind();
35   Window_spec *win_spec;
36   while((win_spec= it++) && win_spec != this)
37   {
38     const char *win_spec_name= win_spec->name();
39     if (!win_spec_name)
40       break;
41     if (name && my_strcasecmp(system_charset_info, name, win_spec_name) == 0)
42     {
43       my_error(ER_DUP_WINDOW_NAME, MYF(0), name);
44       return true;
45     }
46     if (ref_name &&
47         my_strcasecmp(system_charset_info, ref_name, win_spec_name) == 0)
48     {
49       if (partition_list->elements)
50       {
51         my_error(ER_PARTITION_LIST_IN_REFERENCING_WINDOW_SPEC, MYF(0),
52                  ref_name);
53         return true;
54       }
55       if (win_spec->order_list->elements && order_list->elements)
56       {
57         my_error(ER_ORDER_LIST_IN_REFERENCING_WINDOW_SPEC, MYF(0), ref_name);
58         return true;
59       }
60       if (win_spec->window_frame)
61       {
62         my_error(ER_WINDOW_FRAME_IN_REFERENCED_WINDOW_SPEC, MYF(0), ref_name);
63         return true;
64       }
65       referenced_win_spec= win_spec;
66       if (partition_list->elements == 0)
67         partition_list= win_spec->partition_list;
68       if (order_list->elements == 0)
69         order_list= win_spec->order_list;
70     }
71   }
72   if (ref_name && !referenced_win_spec)
73   {
74     my_error(ER_WRONG_WINDOW_SPEC_NAME, MYF(0), ref_name);
75     return true;
76   }
77   window_names_are_checked= true;
78   return false;
79 }
80 
81 void
print(String * str,enum_query_type query_type)82 Window_spec::print(String *str, enum_query_type query_type)
83 {
84   str->append('(');
85   print_partition(str, query_type);
86   print_order(str, query_type);
87 
88   if (window_frame)
89     window_frame->print(str, query_type);
90   str->append(')');
91 }
92 
93 void
print_partition(String * str,enum_query_type query_type)94 Window_spec::print_partition(String *str, enum_query_type query_type)
95 {
96   if (partition_list->first)
97   {
98     str->append(STRING_WITH_LEN(" partition by "));
99     st_select_lex::print_order(str, partition_list->first, query_type);
100   }
101 }
102 
103 void
print_order(String * str,enum_query_type query_type)104 Window_spec::print_order(String *str, enum_query_type query_type)
105 {
106   if (order_list->first)
107   {
108     str->append(STRING_WITH_LEN(" order by "));
109     st_select_lex::print_order(str, order_list->first, query_type);
110   }
111 }
112 
113 bool
check_frame_bounds()114 Window_frame::check_frame_bounds()
115 {
116   if ((top_bound->is_unbounded() &&
117        top_bound->precedence_type == Window_frame_bound::FOLLOWING) ||
118       (bottom_bound->is_unbounded() &&
119        bottom_bound->precedence_type == Window_frame_bound::PRECEDING) ||
120       (top_bound->precedence_type == Window_frame_bound::CURRENT &&
121        bottom_bound->precedence_type == Window_frame_bound::PRECEDING) ||
122       (bottom_bound->precedence_type == Window_frame_bound::CURRENT &&
123        top_bound->precedence_type == Window_frame_bound::FOLLOWING))
124   {
125     my_error(ER_BAD_COMBINATION_OF_WINDOW_FRAME_BOUND_SPECS, MYF(0));
126     return true;
127   }
128 
129   return false;
130 }
131 
132 
133 void
print(String * str,enum_query_type query_type)134 Window_frame::print(String *str, enum_query_type query_type)
135 {
136   switch (units) {
137   case UNITS_ROWS:
138     str->append(STRING_WITH_LEN(" rows "));
139     break;
140   case UNITS_RANGE:
141     str->append(STRING_WITH_LEN(" range "));
142     break;
143   default:
144     DBUG_ASSERT(0);
145   }
146 
147   str->append(STRING_WITH_LEN("between "));
148   top_bound->print(str, query_type);
149   str->append(STRING_WITH_LEN(" and "));
150   bottom_bound->print(str, query_type);
151 
152   if (exclusion != EXCL_NONE)
153   {
154      str->append(STRING_WITH_LEN(" exclude "));
155      switch (exclusion) {
156      case EXCL_CURRENT_ROW:
157        str->append(STRING_WITH_LEN(" current row "));
158        break;
159      case EXCL_GROUP:
160        str->append(STRING_WITH_LEN(" group "));
161        break;
162      case EXCL_TIES:
163        str->append(STRING_WITH_LEN(" ties "));
164        break;
165      default:
166        DBUG_ASSERT(0);
167        ;
168      }
169   }
170 }
171 
172 
173 void
print(String * str,enum_query_type query_type)174 Window_frame_bound::print(String *str, enum_query_type query_type)
175 {
176   if (precedence_type == CURRENT)
177   {
178     str->append(STRING_WITH_LEN(" current row "));
179     return;
180   }
181   if (is_unbounded())
182     str->append(STRING_WITH_LEN(" unbounded "));
183   else
184     offset->print(str ,query_type);
185   switch (precedence_type) {
186   case PRECEDING:
187     str->append(STRING_WITH_LEN(" preceding "));
188     break;
189   case FOLLOWING:
190     str->append(STRING_WITH_LEN(" following "));
191     break;
192   default:
193     DBUG_ASSERT(0);
194   }
195 }
196 
197 /*
198   Setup window functions in a select
199 */
200 
201 int
setup_windows(THD * thd,Ref_ptr_array ref_pointer_array,TABLE_LIST * tables,List<Item> & fields,List<Item> & all_fields,List<Window_spec> & win_specs,List<Item_window_func> & win_funcs)202 setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
203               List<Item> &fields, List<Item> &all_fields,
204               List<Window_spec> &win_specs, List<Item_window_func> &win_funcs)
205 {
206   Window_spec *win_spec;
207   DBUG_ENTER("setup_windows");
208   List_iterator<Window_spec> it(win_specs);
209 
210   /*
211     Move all unnamed specifications after the named ones.
212     We could have avoided it if we had built two separate lists for
213     named and unnamed specifications.
214   */
215   Query_arena *arena, backup;
216   arena= thd->activate_stmt_arena_if_needed(&backup);
217   uint i = 0;
218   uint elems= win_specs.elements;
219   while ((win_spec= it++) && i++ < elems)
220   {
221     if (win_spec->name() == NULL)
222     {
223       it.remove();
224       win_specs.push_back(win_spec);
225     }
226   }
227   if (arena)
228     thd->restore_active_arena(arena, &backup);
229 
230   it.rewind();
231 
232   List_iterator_fast<Window_spec> itp(win_specs);
233 
234   while ((win_spec= it++))
235   {
236     bool hidden_group_fields;
237     if (win_spec->check_window_names(itp) ||
238         setup_group(thd, ref_pointer_array, tables, fields, all_fields,
239                     win_spec->partition_list->first, &hidden_group_fields,
240                     true) ||
241         setup_order(thd, ref_pointer_array, tables, fields, all_fields,
242                     win_spec->order_list->first, true) ||
243         (win_spec->window_frame &&
244          win_spec->window_frame->check_frame_bounds()))
245     {
246       DBUG_RETURN(1);
247     }
248 
249     if (win_spec->window_frame &&
250         win_spec->window_frame->exclusion != Window_frame::EXCL_NONE)
251     {
252       my_error(ER_FRAME_EXCLUSION_NOT_SUPPORTED, MYF(0));
253       DBUG_RETURN(1);
254     }
255     /*
256        For  "win_func() OVER (ORDER BY order_list RANGE BETWEEN ...)",
257        - ORDER BY order_list must not be ommitted
258        - the list must have a single element.
259     */
260     if (win_spec->window_frame &&
261         win_spec->window_frame->units == Window_frame::UNITS_RANGE)
262     {
263       if (win_spec->order_list->elements != 1)
264       {
265         my_error(ER_RANGE_FRAME_NEEDS_SIMPLE_ORDERBY, MYF(0));
266         DBUG_RETURN(1);
267       }
268 
269       /*
270         "The declared type of SK shall be numeric, datetime, or interval"
271         we don't support datetime or interval, yet.
272       */
273       Item_result rtype= win_spec->order_list->first->item[0]->result_type();
274       if (rtype != REAL_RESULT && rtype != INT_RESULT &&
275           rtype != DECIMAL_RESULT)
276       {
277         my_error(ER_WRONG_TYPE_FOR_RANGE_FRAME, MYF(0));
278         DBUG_RETURN(1);
279       }
280 
281       /*
282         "The declared type of UVS shall be numeric if the declared type of SK
283         is numeric; otherwise, it shall be an interval type that may be added
284         to or subtracted from the declared type of SK"
285       */
286       Window_frame_bound *bounds[]= {win_spec->window_frame->top_bound,
287                                      win_spec->window_frame->bottom_bound,
288                                      NULL};
289       for (Window_frame_bound **pbound= &bounds[0]; *pbound; pbound++)
290       {
291         if (!(*pbound)->is_unbounded() &&
292             ((*pbound)->precedence_type == Window_frame_bound::FOLLOWING ||
293              (*pbound)->precedence_type == Window_frame_bound::PRECEDING))
294         {
295           Item_result rtype= (*pbound)->offset->result_type();
296           if (rtype != REAL_RESULT && rtype != INT_RESULT &&
297               rtype != DECIMAL_RESULT)
298           {
299             my_error(ER_WRONG_TYPE_FOR_RANGE_FRAME, MYF(0));
300             DBUG_RETURN(1);
301           }
302         }
303       }
304     }
305 
306     /* "ROWS PRECEDING|FOLLOWING $n" must have a numeric $n */
307     if (win_spec->window_frame &&
308         win_spec->window_frame->units == Window_frame::UNITS_ROWS)
309     {
310       Window_frame_bound *bounds[]= {win_spec->window_frame->top_bound,
311                                      win_spec->window_frame->bottom_bound,
312                                      NULL};
313       for (Window_frame_bound **pbound= &bounds[0]; *pbound; pbound++)
314       {
315         if (!(*pbound)->is_unbounded() &&
316             ((*pbound)->precedence_type == Window_frame_bound::FOLLOWING ||
317              (*pbound)->precedence_type == Window_frame_bound::PRECEDING))
318         {
319           Item *offset= (*pbound)->offset;
320           if (offset->result_type() != INT_RESULT)
321           {
322             my_error(ER_WRONG_TYPE_FOR_ROWS_FRAME, MYF(0));
323             DBUG_RETURN(1);
324           }
325         }
326       }
327     }
328   }
329 
330   List_iterator_fast<Item_window_func> li(win_funcs);
331   while (Item_window_func * win_func_item= li++)
332   {
333     if (win_func_item->check_result_type_of_order_item())
334       DBUG_RETURN(1);
335   }
336   DBUG_RETURN(0);
337 }
338 
339 
340 /**
341   @brief
342   Find fields common for all partition lists used in window functions
343 
344   @param thd       The thread handle
345 
346   @details
347    This function looks for the field references in the partition lists
348    of all window functions used in this select that are common for
349    all the partition lists. The function returns an ORDER list contained
350    all such references.The list either is specially built by the function
351    or is taken directly from the first window specification.
352 
353   @retval
354     pointer to the first element of the ORDER list contained field
355     references common for all partition lists
356     0 if no such reference is found.
357 */
358 
find_common_window_func_partition_fields(THD * thd)359 ORDER *st_select_lex::find_common_window_func_partition_fields(THD *thd)
360 {
361   ORDER *ord;
362   Item *item;
363   DBUG_ASSERT(window_funcs.elements);
364   List_iterator_fast<Item_window_func> it(window_funcs);
365   Item_window_func *first_wf= it++;
366   if (!first_wf->window_spec->partition_list)
367     return 0;
368   List<Item> common_fields;
369   uint first_partition_elements= 0;
370   for (ord= first_wf->window_spec->partition_list->first; ord; ord= ord->next)
371   {
372     if ((*ord->item)->real_item()->type() == Item::FIELD_ITEM)
373       common_fields.push_back(*ord->item, thd->mem_root);
374     first_partition_elements++;
375   }
376   if (window_specs.elements == 1 &&
377       common_fields.elements == first_partition_elements)
378     return first_wf->window_spec->partition_list->first;
379   List_iterator<Item> li(common_fields);
380   Item_window_func *wf;
381   while (common_fields.elements && (wf= it++))
382   {
383     if (!wf->window_spec->partition_list)
384       return 0;
385     while ((item= li++))
386     {
387       for (ord= wf->window_spec->partition_list->first; ord; ord= ord->next)
388       {
389         if (item->eq(*ord->item, false))
390 	  break;
391       }
392       if (!ord)
393         li.remove();
394     }
395     li.rewind();
396   }
397   if (!common_fields.elements)
398     return 0;
399   if (common_fields.elements == first_partition_elements)
400     return first_wf->window_spec->partition_list->first;
401   SQL_I_List<ORDER> res_list;
402   for (ord= first_wf->window_spec->partition_list->first, item= li++;
403        ord; ord= ord->next)
404   {
405     if (item != *ord->item)
406       continue;
407     if (add_to_list(thd, res_list, item, ord->direction))
408       return 0;
409     item= li++;
410   }
411   return res_list.first;
412 }
413 
414 
415 /////////////////////////////////////////////////////////////////////////////
416 // Sorting window functions to minimize the number of table scans
417 // performed during the computation of these functions
418 /////////////////////////////////////////////////////////////////////////////
419 
420 #define CMP_LT        -2    // Less than
421 #define CMP_LT_C      -1    // Less than and compatible
422 #define CMP_EQ         0    // Equal to
423 #define CMP_GT_C       1    // Greater than and compatible
424 #define CMP_GT         2    // Greater then
425 
426 static
compare_order_elements(ORDER * ord1,ORDER * ord2)427 int compare_order_elements(ORDER *ord1, ORDER *ord2)
428 {
429   if (*ord1->item == *ord2->item && ord1->direction == ord2->direction)
430     return CMP_EQ;
431   Item *item1= (*ord1->item)->real_item();
432   Item *item2= (*ord2->item)->real_item();
433   DBUG_ASSERT(item1->type() == Item::FIELD_ITEM &&
434               item2->type() == Item::FIELD_ITEM);
435   int cmp= ((Item_field *) item1)->field->field_index -
436            ((Item_field *) item2)->field->field_index;
437   if (cmp == 0)
438   {
439     if (ord1->direction == ord2->direction)
440       return CMP_EQ;
441     return ord1->direction > ord2->direction ? CMP_GT : CMP_LT;
442   }
443   else
444     return cmp > 0 ? CMP_GT : CMP_LT;
445 }
446 
447 static
compare_order_lists(SQL_I_List<ORDER> * part_list1,SQL_I_List<ORDER> * part_list2)448 int compare_order_lists(SQL_I_List<ORDER> *part_list1,
449                         SQL_I_List<ORDER> *part_list2)
450 {
451   if (part_list1 == part_list2)
452     return CMP_EQ;
453   ORDER *elem1= part_list1->first;
454   ORDER *elem2= part_list2->first;
455   for ( ; elem1 && elem2; elem1= elem1->next, elem2= elem2->next)
456   {
457     int cmp;
458     // remove all constants as we don't need them for comparision
459     while(elem1 && ((*elem1->item)->real_item())->const_item())
460     {
461       elem1= elem1->next;
462       continue;
463     }
464 
465     while(elem2 && ((*elem2->item)->real_item())->const_item())
466     {
467       elem2= elem2->next;
468       continue;
469     }
470 
471     if (!elem1 || !elem2)
472       break;
473 
474     if ((cmp= compare_order_elements(elem1, elem2)))
475       return cmp;
476   }
477   if (elem1)
478     return CMP_GT_C;
479   if (elem2)
480     return CMP_LT_C;
481   return CMP_EQ;
482 }
483 
484 
485 static
compare_window_frame_bounds(Window_frame_bound * win_frame_bound1,Window_frame_bound * win_frame_bound2,bool is_bottom_bound)486 int compare_window_frame_bounds(Window_frame_bound *win_frame_bound1,
487                                 Window_frame_bound *win_frame_bound2,
488                                 bool is_bottom_bound)
489 {
490   int res;
491   if (win_frame_bound1->precedence_type != win_frame_bound2->precedence_type)
492   {
493     res= win_frame_bound1->precedence_type > win_frame_bound2->precedence_type ?
494          CMP_GT : CMP_LT;
495     if (is_bottom_bound)
496       res= -res;
497     return res;
498   }
499 
500   if (win_frame_bound1->is_unbounded() && win_frame_bound2->is_unbounded())
501     return CMP_EQ;
502 
503   if (!win_frame_bound1->is_unbounded() && !win_frame_bound2->is_unbounded())
504   {
505     if (win_frame_bound1->offset->eq(win_frame_bound2->offset, true))
506       return CMP_EQ;
507     else
508     {
509       res= strcmp(win_frame_bound1->offset->name.str,
510                   win_frame_bound2->offset->name.str);
511       res= res > 0 ? CMP_GT : CMP_LT;
512       if (is_bottom_bound)
513         res= -res;
514       return res;
515     }
516   }
517 
518   /*
519     Here we have:
520     win_frame_bound1->is_unbounded() != win_frame_bound1->is_unbounded()
521   */
522   return is_bottom_bound != win_frame_bound1->is_unbounded() ? CMP_LT : CMP_GT;
523 }
524 
525 
526 static
compare_window_frames(Window_frame * win_frame1,Window_frame * win_frame2)527 int compare_window_frames(Window_frame *win_frame1,
528                           Window_frame *win_frame2)
529 {
530   int cmp;
531 
532   if (win_frame1 == win_frame2)
533     return CMP_EQ;
534 
535   if (!win_frame1)
536     return CMP_LT;
537 
538   if (!win_frame2)
539     return CMP_GT;
540 
541   if (win_frame1->units != win_frame2->units)
542     return win_frame1->units > win_frame2->units ? CMP_GT : CMP_LT;
543 
544   cmp= compare_window_frame_bounds(win_frame1->top_bound,
545                                    win_frame2->top_bound,
546                                    false);
547   if (cmp)
548     return cmp;
549 
550   cmp= compare_window_frame_bounds(win_frame1->bottom_bound,
551                                    win_frame2->bottom_bound,
552                                    true);
553   if (cmp)
554     return cmp;
555 
556   if (win_frame1->exclusion != win_frame2->exclusion)
557     return win_frame1->exclusion > win_frame2->exclusion ? CMP_GT_C : CMP_LT_C;
558 
559   return CMP_EQ;
560 }
561 
562 static
compare_window_spec_joined_lists(Window_spec * win_spec1,Window_spec * win_spec2)563 int compare_window_spec_joined_lists(Window_spec *win_spec1,
564                                      Window_spec *win_spec2)
565 {
566   win_spec1->join_partition_and_order_lists();
567   win_spec2->join_partition_and_order_lists();
568   int cmp= compare_order_lists(win_spec1->partition_list,
569                                win_spec2->partition_list);
570   win_spec1->disjoin_partition_and_order_lists();
571   win_spec2->disjoin_partition_and_order_lists();
572   return cmp;
573 }
574 
575 
576 static
compare_window_funcs_by_window_specs(Item_window_func * win_func1,Item_window_func * win_func2,void * arg)577 int compare_window_funcs_by_window_specs(Item_window_func *win_func1,
578                                          Item_window_func *win_func2,
579                                          void *arg)
580 {
581   int cmp;
582   Window_spec *win_spec1= win_func1->window_spec;
583   Window_spec *win_spec2= win_func2->window_spec;
584   if (win_spec1 == win_spec2)
585     return CMP_EQ;
586   cmp= compare_order_lists(win_spec1->partition_list,
587                            win_spec2->partition_list);
588   if (cmp == CMP_EQ)
589   {
590     /*
591       Partition lists contain the same elements.
592       Let's use only one of the lists.
593     */
594     if (!win_spec1->name() && win_spec2->name())
595     {
596       win_spec1->save_partition_list= win_spec1->partition_list;
597       win_spec1->partition_list= win_spec2->partition_list;
598     }
599     else
600     {
601       win_spec2->save_partition_list= win_spec2->partition_list;
602       win_spec2->partition_list= win_spec1->partition_list;
603     }
604 
605     cmp= compare_order_lists(win_spec1->order_list,
606                              win_spec2->order_list);
607 
608     if (cmp != CMP_EQ)
609       return cmp;
610 
611     /*
612        Order lists contain the same elements.
613        Let's use only one of the lists.
614     */
615     if (!win_spec1->name() && win_spec2->name())
616     {
617       win_spec1->save_order_list= win_spec2->order_list;
618       win_spec1->order_list= win_spec2->order_list;
619     }
620     else
621     {
622       win_spec1->save_order_list= win_spec2->order_list;
623       win_spec2->order_list= win_spec1->order_list;
624     }
625 
626     cmp= compare_window_frames(win_spec1->window_frame,
627                                win_spec2->window_frame);
628 
629     if (cmp != CMP_EQ)
630       return cmp;
631 
632     /* Window frames are equal. Let's use only one of them. */
633     if (!win_spec1->name() && win_spec2->name())
634       win_spec1->window_frame= win_spec2->window_frame;
635     else
636       win_spec2->window_frame= win_spec1->window_frame;
637 
638     return CMP_EQ;
639   }
640 
641   if (cmp == CMP_GT || cmp == CMP_LT)
642     return cmp;
643 
644   /* one of the partitions lists is the proper beginning of the another */
645   cmp= compare_window_spec_joined_lists(win_spec1, win_spec2);
646 
647   if (CMP_LT_C <= cmp && cmp <= CMP_GT_C)
648     cmp= win_spec1->partition_list->elements <
649       win_spec2->partition_list->elements ? CMP_GT_C : CMP_LT_C;
650 
651   return cmp;
652 }
653 
654 
655 #define  SORTORDER_CHANGE_FLAG    1
656 #define  PARTITION_CHANGE_FLAG    2
657 #define  FRAME_CHANGE_FLAG        4
658 
659 typedef int (*Item_window_func_cmp)(Item_window_func *f1,
660                                     Item_window_func *f2,
661                                     void *arg);
662 /*
663   @brief
664     Sort window functions so that those that can be computed together are
665     adjacent.
666 
667   @detail
668     Sort window functions by their
669      - required sorting order,
670      - partition list,
671      - window frame compatibility.
672 
673     The changes between the groups are marked by setting item_window_func->marker.
674 */
675 
676 static
order_window_funcs_by_window_specs(List<Item_window_func> * win_func_list)677 void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
678 {
679   if (win_func_list->elements == 0)
680     return;
681 
682   bubble_sort<Item_window_func>(win_func_list,
683                                 compare_window_funcs_by_window_specs,
684                                 NULL);
685 
686   List_iterator_fast<Item_window_func> it(*win_func_list);
687   Item_window_func *prev= it++;
688   prev->marker= SORTORDER_CHANGE_FLAG |
689                 PARTITION_CHANGE_FLAG |
690                 FRAME_CHANGE_FLAG;
691   Item_window_func *curr;
692   while ((curr= it++))
693   {
694     Window_spec *win_spec_prev= prev->window_spec;
695     Window_spec *win_spec_curr= curr->window_spec;
696     curr->marker= 0;
697     if (!(win_spec_prev->partition_list == win_spec_curr->partition_list &&
698           win_spec_prev->order_list == win_spec_curr->order_list))
699     {
700       int cmp;
701       if (win_spec_prev->partition_list == win_spec_curr->partition_list)
702         cmp= compare_order_lists(win_spec_prev->order_list,
703                                  win_spec_curr->order_list);
704       else
705         cmp= compare_window_spec_joined_lists(win_spec_prev, win_spec_curr);
706       if (!(CMP_LT_C <= cmp && cmp <= CMP_GT_C))
707       {
708         curr->marker= SORTORDER_CHANGE_FLAG |
709                       PARTITION_CHANGE_FLAG |
710                       FRAME_CHANGE_FLAG;
711       }
712       else if (win_spec_prev->partition_list != win_spec_curr->partition_list)
713       {
714         curr->marker|= PARTITION_CHANGE_FLAG | FRAME_CHANGE_FLAG;
715       }
716     }
717     else if (win_spec_prev->window_frame != win_spec_curr->window_frame)
718       curr->marker|= FRAME_CHANGE_FLAG;
719 
720     prev= curr;
721   }
722 }
723 
724 
725 /////////////////////////////////////////////////////////////////////////////
726 
727 
728 /////////////////////////////////////////////////////////////////////////////
729 // Window Frames support
730 /////////////////////////////////////////////////////////////////////////////
731 
732 // note: make rr_from_pointers static again when not need it here anymore
733 int rr_from_pointers(READ_RECORD *info);
734 
735 
736 /////////////////////////////////////////////////////////////////////////////
737 
738 
739 /*
740   A cursor over a sequence of rowids. One can
741    - Move to next rowid
742    - jump to given number in the sequence
743    - Know the number of the current rowid (i.e. how many rowids have been read)
744 */
745 
746 class Rowid_seq_cursor
747 {
748 public:
Rowid_seq_cursor()749   Rowid_seq_cursor() : io_cache(NULL), ref_buffer(0) {}
~Rowid_seq_cursor()750   virtual ~Rowid_seq_cursor()
751   {
752     if (ref_buffer)
753       my_free(ref_buffer);
754     if (io_cache)
755     {
756       end_slave_io_cache(io_cache);
757       my_free(io_cache);
758       io_cache= NULL;
759     }
760   }
761 
762 private:
763   /* Length of one rowid element */
764   size_t ref_length;
765 
766   /* If io_cache=!NULL, use it */
767   IO_CACHE *io_cache;
768   uchar *ref_buffer;   /* Buffer for the last returned rowid */
769   ha_rows rownum;     /* Number of the rowid that is about to be returned */
770   ha_rows current_ref_buffer_rownum;
771   bool ref_buffer_valid;
772 
773   /* The following are used when we are reading from an array of pointers */
774   uchar *cache_start;
775   uchar *cache_pos;
776   uchar *cache_end;
777 public:
778 
init(READ_RECORD * info)779   void init(READ_RECORD *info)
780   {
781     ref_length= info->ref_length;
782     if (info->read_record_func == rr_from_pointers)
783     {
784       io_cache= NULL;
785       cache_start= info->cache_pos;
786       cache_pos=   info->cache_pos;
787       cache_end=   info->cache_end;
788     }
789     else
790     {
791       //DBUG_ASSERT(info->read_record == rr_from_tempfile);
792       rownum= 0;
793       io_cache= (IO_CACHE*)my_malloc(PSI_INSTRUMENT_ME, sizeof(IO_CACHE), MYF(0));
794       init_slave_io_cache(info->io_cache, io_cache);
795 
796       ref_buffer= (uchar*)my_malloc(PSI_INSTRUMENT_ME, ref_length, MYF(0));
797       ref_buffer_valid= false;
798     }
799   }
800 
next()801   virtual int next()
802   {
803     /* Allow multiple next() calls in EOF state. */
804     if (at_eof())
805         return -1;
806 
807     if (io_cache)
808     {
809       rownum++;
810     }
811     else
812     {
813       cache_pos+= ref_length;
814       DBUG_ASSERT(cache_pos <= cache_end);
815     }
816     return 0;
817   }
818 
prev()819   virtual int prev()
820   {
821     if (io_cache)
822     {
823       if (rownum == 0)
824         return -1;
825 
826       rownum--;
827       return 0;
828     }
829     else
830     {
831       /* Allow multiple prev() calls when positioned at the start. */
832       if (cache_pos == cache_start)
833         return -1;
834       cache_pos-= ref_length;
835       DBUG_ASSERT(cache_pos >= cache_start);
836       return 0;
837     }
838   }
839 
get_rownum() const840   ha_rows get_rownum() const
841   {
842     if (io_cache)
843       return rownum;
844     else
845       return (cache_pos - cache_start) / ref_length;
846   }
847 
move_to(ha_rows row_number)848   void move_to(ha_rows row_number)
849   {
850     if (io_cache)
851     {
852       rownum= row_number;
853     }
854     else
855     {
856       cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length);
857       DBUG_ASSERT(cache_pos <= cache_end);
858     }
859   }
860 
861 protected:
at_eof()862   bool at_eof()
863   {
864     if (io_cache)
865     {
866       return rownum * ref_length >= io_cache->end_of_file;
867     }
868     else
869       return (cache_pos == cache_end);
870   }
871 
get_curr_rowid(uchar ** row_id)872   bool get_curr_rowid(uchar **row_id)
873   {
874     if (io_cache)
875     {
876       DBUG_ASSERT(!at_eof());
877       if (!ref_buffer_valid || current_ref_buffer_rownum != rownum)
878       {
879         seek_io_cache(io_cache, rownum * ref_length);
880         if (my_b_read(io_cache,ref_buffer,ref_length))
881         {
882           /* Error reading from file. */
883           return true;
884         }
885         ref_buffer_valid= true;
886         current_ref_buffer_rownum = rownum;
887       }
888       *row_id = ref_buffer;
889       return false;
890     }
891     else
892     {
893       *row_id= cache_pos;
894       return false;
895     }
896   }
897 };
898 
899 
900 /*
901   Cursor which reads from rowid sequence and also retrieves table rows.
902 */
903 
904 class Table_read_cursor : public Rowid_seq_cursor
905 {
906 public:
~Table_read_cursor()907   virtual ~Table_read_cursor() {}
908 
init(READ_RECORD * info)909   void init(READ_RECORD *info)
910   {
911     Rowid_seq_cursor::init(info);
912     table= info->table;
913     record= info->record();
914   }
915 
fetch()916   virtual int fetch()
917   {
918     if (at_eof())
919       return -1;
920 
921     uchar* curr_rowid;
922     if (get_curr_rowid(&curr_rowid))
923       return -1;
924     return table->file->ha_rnd_pos(record, curr_rowid);
925   }
926 
927 private:
928   /* The table that is acccesed by this cursor. */
929   TABLE *table;
930   /* Buffer where to store the table's record data. */
931   uchar *record;
932 
933   // TODO(spetrunia): should move_to() also read row here?
934 };
935 
936 
937 /*
938   A cursor which only moves within a partition. The scan stops at the partition
939   end, and it needs an explicit command to move to the next partition.
940 
941   This cursor can not move backwards.
942 */
943 
944 class Partition_read_cursor : public Table_read_cursor
945 {
946 public:
Partition_read_cursor(THD * thd,SQL_I_List<ORDER> * partition_list)947   Partition_read_cursor(THD *thd, SQL_I_List<ORDER> *partition_list) :
948     bound_tracker(thd, partition_list) {}
949 
init(READ_RECORD * info)950   void init(READ_RECORD *info)
951   {
952     Table_read_cursor::init(info);
953     bound_tracker.init();
954     end_of_partition= false;
955   }
956 
957   /*
958     Informs the cursor that we need to move into the next partition.
959     The next partition is provided in two ways:
960     - in table->record[0]..
961     - rownum parameter has the row number.
962   */
on_next_partition(ha_rows rownum)963   void on_next_partition(ha_rows rownum)
964   {
965     /* Remember the sort key value from the new partition */
966     move_to(rownum);
967     bound_tracker.check_if_next_group();
968     end_of_partition= false;
969 
970   }
971 
972   /*
973     This returns -1 when end of partition was reached.
974   */
next()975   int next()
976   {
977     int res;
978     if (end_of_partition)
979       return -1;
980 
981     if ((res= Table_read_cursor::next()) ||
982         (res= fetch()))
983     {
984       /* TODO(cvicentiu) This does not consider table read failures.
985          Perhaps assuming end of table like this is fine in that case. */
986 
987       /* This row is the final row in the table. To maintain semantics
988          that cursors always point to the last valid row, move back one step,
989          but mark end_of_partition as true. */
990       Table_read_cursor::prev();
991       end_of_partition= true;
992       return res;
993     }
994 
995     if (bound_tracker.compare_with_cache())
996     {
997       /* This row is part of a new partition, don't move
998          forward any more untill we get informed of a new partition. */
999       Table_read_cursor::prev();
1000       end_of_partition= true;
1001       return -1;
1002     }
1003     return 0;
1004   }
1005 
1006 private:
1007   Group_bound_tracker bound_tracker;
1008   bool end_of_partition;
1009 };
1010 
1011 
1012 
1013 /////////////////////////////////////////////////////////////////////////////
1014 
1015 /*
1016   Window frame bound cursor. Abstract interface.
1017 
1018   @detail
1019     The cursor moves within the partition that the current row is in.
1020     It may be ahead or behind the current row.
1021 
1022     The cursor also assumes that the current row moves forward through the
1023     partition and will move to the next adjacent partition after this one.
1024 
1025     List of all cursor classes:
1026       Frame_cursor
1027         Frame_range_n_top
1028         Frame_range_n_bottom
1029 
1030         Frame_range_current_row_top
1031         Frame_range_current_row_bottom
1032 
1033         Frame_n_rows_preceding
1034         Frame_n_rows_following
1035 
1036         Frame_rows_current_row_top = Frame_n_rows_preceding(0)
1037         Frame_rows_current_row_bottom
1038 
1039         // These handle both RANGE and ROWS-type bounds
1040         Frame_unbounded_preceding
1041         Frame_unbounded_following
1042 
1043         // This is not used as a frame bound, it counts rows in the partition:
1044         Frame_unbounded_following_set_count : public Frame_unbounded_following
1045 
1046   @todo
1047   - if we want to allocate this on the MEM_ROOT we should make sure
1048     it is not re-allocated for every subquery execution.
1049 */
1050 
1051 class Frame_cursor : public Sql_alloc
1052 {
1053 public:
Frame_cursor()1054   Frame_cursor() : sum_functions(), perform_no_action(false) {}
1055 
init(READ_RECORD * info)1056   virtual void init(READ_RECORD *info) {};
1057 
add_sum_func(Item_sum * item)1058   bool add_sum_func(Item_sum* item)
1059   {
1060     return sum_functions.push_back(item);
1061   }
1062   /*
1063     Current row has moved to the next partition and is positioned on the first
1064     row there. Position the frame bound accordingly.
1065 
1066     @param first   -  TRUE means this is the first partition
1067     @param item    -  Put or remove rows from there.
1068 
1069     @detail
1070       - if first==false, the caller guarantees that tbl->record[0] points at the
1071         first row in the new partition.
1072       - if first==true, we are just starting in the first partition and no such
1073         guarantee is provided.
1074 
1075       - The callee may move tbl->file and tbl->record[0] to point to some other
1076         row.
1077   */
pre_next_partition(ha_rows rownum)1078   virtual void pre_next_partition(ha_rows rownum) {};
1079   virtual void next_partition(ha_rows rownum)=0;
1080 
1081   /*
1082     The current row has moved one row forward.
1083     Move this frame bound accordingly, and update the value of aggregate
1084     function as necessary.
1085   */
pre_next_row()1086   virtual void pre_next_row() {};
1087   virtual void next_row()=0;
1088 
is_outside_computation_bounds() const1089   virtual bool is_outside_computation_bounds() const { return false; };
1090 
~Frame_cursor()1091   virtual ~Frame_cursor() {}
1092 
1093   /*
1094      Regular frame cursors add or remove values from the sum functions they
1095      manage. By calling this method, they will only perform the required
1096      movement within the table, but no adding/removing will happen.
1097   */
set_no_action()1098   void set_no_action()
1099   {
1100     perform_no_action= true;
1101   }
1102 
1103   /* Retrieves the row number that this cursor currently points at. */
1104   virtual ha_rows get_curr_rownum() const= 0;
1105 
1106 protected:
add_value_to_items()1107   inline void add_value_to_items()
1108   {
1109     if (perform_no_action)
1110       return;
1111 
1112     List_iterator_fast<Item_sum> it(sum_functions);
1113     Item_sum *item_sum;
1114     while ((item_sum= it++))
1115     {
1116       item_sum->add();
1117     }
1118   }
1119 
remove_value_from_items()1120   inline void remove_value_from_items()
1121   {
1122     if (perform_no_action)
1123       return;
1124 
1125     List_iterator_fast<Item_sum> it(sum_functions);
1126     Item_sum *item_sum;
1127     while ((item_sum= it++))
1128     {
1129       item_sum->remove();
1130     }
1131   }
1132 
1133   /* Clear all sum functions handled by this cursor. */
clear_sum_functions()1134   void clear_sum_functions()
1135   {
1136     List_iterator_fast<Item_sum> iter_sum_func(sum_functions);
1137     Item_sum *sum_func;
1138     while ((sum_func= iter_sum_func++))
1139     {
1140       sum_func->clear();
1141     }
1142   }
1143 
1144   /* Sum functions that this cursor handles. */
1145   List<Item_sum> sum_functions;
1146 
1147 private:
1148   bool perform_no_action;
1149 };
1150 
1151 /*
1152   A class that owns cursor objects associated with a specific window function.
1153 */
1154 class Cursor_manager
1155 {
1156 public:
add_cursor(Frame_cursor * cursor)1157   bool add_cursor(Frame_cursor *cursor)
1158   {
1159     return cursors.push_back(cursor);
1160   }
1161 
initialize_cursors(READ_RECORD * info)1162   void initialize_cursors(READ_RECORD *info)
1163   {
1164     List_iterator_fast<Frame_cursor> iter(cursors);
1165     Frame_cursor *fc;
1166     while ((fc= iter++))
1167       fc->init(info);
1168   }
1169 
notify_cursors_partition_changed(ha_rows rownum)1170   void notify_cursors_partition_changed(ha_rows rownum)
1171   {
1172     List_iterator_fast<Frame_cursor> iter(cursors);
1173     Frame_cursor *cursor;
1174     while ((cursor= iter++))
1175       cursor->pre_next_partition(rownum);
1176 
1177     iter.rewind();
1178     while ((cursor= iter++))
1179       cursor->next_partition(rownum);
1180   }
1181 
notify_cursors_next_row()1182   void notify_cursors_next_row()
1183   {
1184     List_iterator_fast<Frame_cursor> iter(cursors);
1185     Frame_cursor *cursor;
1186     while ((cursor= iter++))
1187       cursor->pre_next_row();
1188 
1189     iter.rewind();
1190     while ((cursor= iter++))
1191       cursor->next_row();
1192   }
1193 
~Cursor_manager()1194   ~Cursor_manager() { cursors.delete_elements(); }
1195 
1196 private:
1197   /* List of the cursors that this manager owns. */
1198   List<Frame_cursor> cursors;
1199 };
1200 
1201 
1202 
1203 //////////////////////////////////////////////////////////////////////////////
1204 // RANGE-type frames
1205 //////////////////////////////////////////////////////////////////////////////
1206 
1207 /*
1208   Frame_range_n_top handles the top end of RANGE-type frame.
1209 
1210   That is, it handles:
1211     RANGE BETWEEN n PRECEDING AND ...
1212     RANGE BETWEEN n FOLLOWING AND ...
1213 
1214   Top of the frame doesn't need to check for partition end, since bottom will
1215   reach it before.
1216 */
1217 
1218 class Frame_range_n_top : public Frame_cursor
1219 {
1220   Partition_read_cursor cursor;
1221 
1222   Cached_item_item *range_expr;
1223 
1224   Item *n_val;
1225   Item *item_add;
1226 
1227   const bool is_preceding;
1228 
1229   bool end_of_partition;
1230 
1231   /*
1232      1  when order_list uses ASC ordering
1233     -1  when order_list uses DESC ordering
1234   */
1235   int order_direction;
1236 public:
Frame_range_n_top(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list,bool is_preceding_arg,Item * n_val_arg)1237   Frame_range_n_top(THD *thd,
1238                     SQL_I_List<ORDER> *partition_list,
1239                     SQL_I_List<ORDER> *order_list,
1240                     bool is_preceding_arg, Item *n_val_arg) :
1241     cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
1242     is_preceding(is_preceding_arg)
1243   {
1244     DBUG_ASSERT(order_list->elements == 1);
1245     Item *src_expr= order_list->first->item[0];
1246     if (order_list->first->direction == ORDER::ORDER_ASC)
1247       order_direction= 1;
1248     else
1249       order_direction= -1;
1250 
1251     range_expr= (Cached_item_item*) new_Cached_item(thd, src_expr, FALSE);
1252 
1253     bool use_minus= is_preceding;
1254     if (order_direction == -1)
1255       use_minus= !use_minus;
1256 
1257     if (use_minus)
1258       item_add= new (thd->mem_root) Item_func_minus(thd, src_expr, n_val);
1259     else
1260       item_add= new (thd->mem_root) Item_func_plus(thd, src_expr, n_val);
1261 
1262     item_add->fix_fields(thd, &item_add);
1263   }
1264 
init(READ_RECORD * info)1265   void init(READ_RECORD *info)
1266   {
1267     cursor.init(info);
1268   }
1269 
pre_next_partition(ha_rows rownum)1270   void pre_next_partition(ha_rows rownum)
1271   {
1272     // Save the value of FUNC(current_row)
1273     range_expr->fetch_value_from(item_add);
1274 
1275     cursor.on_next_partition(rownum);
1276     end_of_partition= false;
1277   }
1278 
next_partition(ha_rows rownum)1279   void next_partition(ha_rows rownum)
1280   {
1281     walk_till_non_peer();
1282   }
1283 
pre_next_row()1284   void pre_next_row()
1285   {
1286     if (end_of_partition)
1287       return;
1288     range_expr->fetch_value_from(item_add);
1289   }
1290 
next_row()1291   void next_row()
1292   {
1293     if (end_of_partition)
1294       return;
1295     /*
1296       Ok, our cursor is at the first row R where
1297         (prev_row + n) >= R
1298       We need to check about the current row.
1299     */
1300     walk_till_non_peer();
1301   }
1302 
get_curr_rownum() const1303   ha_rows get_curr_rownum() const
1304   {
1305     return cursor.get_rownum();
1306   }
1307 
is_outside_computation_bounds() const1308   bool is_outside_computation_bounds() const
1309   {
1310     if (end_of_partition)
1311       return true;
1312     return false;
1313   }
1314 
1315 private:
walk_till_non_peer()1316   void walk_till_non_peer()
1317   {
1318     if (cursor.fetch()) // ERROR
1319       return;
1320     // Current row is not a peer.
1321     if (order_direction * range_expr->cmp_read_only() <= 0)
1322       return;
1323     remove_value_from_items();
1324 
1325     int res;
1326     while (!(res= cursor.next()))
1327     {
1328       /* Note, no need to fetch the value explicitly here. The partition
1329          read cursor will fetch it to check if the partition has changed.
1330          TODO(cvicentiu) make this piece of information not necessary by
1331          reimplementing Partition_read_cursor.
1332       */
1333       if (order_direction * range_expr->cmp_read_only() <= 0)
1334         break;
1335       remove_value_from_items();
1336     }
1337     if (res)
1338       end_of_partition= true;
1339   }
1340 
1341 };
1342 
1343 
1344 /*
1345   Frame_range_n_bottom handles bottom end of RANGE-type frame.
1346 
1347   That is, it handles frame bounds in form:
1348     RANGE BETWEEN ... AND n PRECEDING
1349     RANGE BETWEEN ... AND n FOLLOWING
1350 
1351   Bottom end moves first so it needs to check for partition end
1352   (todo: unless it's PRECEDING and in that case it doesnt)
1353   (todo: factor out common parts with Frame_range_n_top into
1354    a common ancestor)
1355 */
1356 
1357 class Frame_range_n_bottom: public Frame_cursor
1358 {
1359   Partition_read_cursor cursor;
1360 
1361   Cached_item_item *range_expr;
1362 
1363   Item *n_val;
1364   Item *item_add;
1365 
1366   const bool is_preceding;
1367 
1368   bool end_of_partition;
1369 
1370   /*
1371      1  when order_list uses ASC ordering
1372     -1  when order_list uses DESC ordering
1373   */
1374   int order_direction;
1375 public:
Frame_range_n_bottom(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list,bool is_preceding_arg,Item * n_val_arg)1376   Frame_range_n_bottom(THD *thd,
1377                        SQL_I_List<ORDER> *partition_list,
1378                        SQL_I_List<ORDER> *order_list,
1379                        bool is_preceding_arg, Item *n_val_arg) :
1380     cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
1381     is_preceding(is_preceding_arg), added_values(false)
1382   {
1383     DBUG_ASSERT(order_list->elements == 1);
1384     Item *src_expr= order_list->first->item[0];
1385 
1386     if (order_list->first->direction == ORDER::ORDER_ASC)
1387       order_direction= 1;
1388     else
1389       order_direction= -1;
1390 
1391     range_expr= (Cached_item_item*) new_Cached_item(thd, src_expr, FALSE);
1392 
1393     bool use_minus= is_preceding;
1394     if (order_direction == -1)
1395       use_minus= !use_minus;
1396 
1397     if (use_minus)
1398       item_add= new (thd->mem_root) Item_func_minus(thd, src_expr, n_val);
1399     else
1400       item_add= new (thd->mem_root) Item_func_plus(thd, src_expr, n_val);
1401 
1402     item_add->fix_fields(thd, &item_add);
1403   }
1404 
init(READ_RECORD * info)1405   void init(READ_RECORD *info)
1406   {
1407     cursor.init(info);
1408   }
1409 
pre_next_partition(ha_rows rownum)1410   void pre_next_partition(ha_rows rownum)
1411   {
1412     // Save the value of FUNC(current_row)
1413     range_expr->fetch_value_from(item_add);
1414 
1415     cursor.on_next_partition(rownum);
1416     end_of_partition= false;
1417     added_values= false;
1418   }
1419 
next_partition(ha_rows rownum)1420   void next_partition(ha_rows rownum)
1421   {
1422     cursor.move_to(rownum);
1423     walk_till_non_peer();
1424   }
1425 
pre_next_row()1426   void pre_next_row()
1427   {
1428     if (end_of_partition)
1429       return;
1430     range_expr->fetch_value_from(item_add);
1431   }
1432 
next_row()1433   void next_row()
1434   {
1435     if (end_of_partition)
1436       return;
1437     /*
1438       Ok, our cursor is at the first row R where
1439         (prev_row + n) >= R
1440       We need to check about the current row.
1441     */
1442     walk_till_non_peer();
1443   }
1444 
is_outside_computation_bounds() const1445   bool is_outside_computation_bounds() const
1446   {
1447     if (!added_values)
1448       return true;
1449     return false;
1450   }
1451 
get_curr_rownum() const1452   ha_rows get_curr_rownum() const
1453   {
1454     if (end_of_partition)
1455       return cursor.get_rownum(); // Cursor does not pass over partition bound.
1456     else
1457       return cursor.get_rownum() - 1; // Cursor is placed on first non peer.
1458   }
1459 
1460 private:
1461   bool added_values;
1462 
walk_till_non_peer()1463   void walk_till_non_peer()
1464   {
1465     cursor.fetch();
1466     // Current row is not a peer.
1467     if (order_direction * range_expr->cmp_read_only() < 0)
1468       return;
1469 
1470     add_value_to_items(); // Add current row.
1471     added_values= true;
1472     int res;
1473     while (!(res= cursor.next()))
1474     {
1475       if (order_direction * range_expr->cmp_read_only() < 0)
1476         break;
1477       add_value_to_items();
1478     }
1479     if (res)
1480       end_of_partition= true;
1481   }
1482 };
1483 
1484 
1485 /*
1486   RANGE BETWEEN ... AND CURRENT ROW, bottom frame bound for CURRENT ROW
1487      ...
1488    | peer1
1489    | peer2  <----- current_row
1490    | peer3
1491    +-peer4  <----- the cursor points here. peer4 itself is included.
1492      nonpeer1
1493      nonpeer2
1494 
1495   This bound moves in front of the current_row. It should be a the first row
1496   that is still a peer of the current row.
1497 */
1498 
1499 class Frame_range_current_row_bottom: public Frame_cursor
1500 {
1501   Partition_read_cursor cursor;
1502 
1503   Group_bound_tracker peer_tracker;
1504 
1505   bool dont_move;
1506 public:
Frame_range_current_row_bottom(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1507   Frame_range_current_row_bottom(THD *thd,
1508                                  SQL_I_List<ORDER> *partition_list,
1509                                  SQL_I_List<ORDER> *order_list) :
1510     cursor(thd, partition_list), peer_tracker(thd, order_list)
1511   {
1512   }
1513 
init(READ_RECORD * info)1514   void init(READ_RECORD *info)
1515   {
1516     cursor.init(info);
1517     peer_tracker.init();
1518   }
1519 
pre_next_partition(ha_rows rownum)1520   void pre_next_partition(ha_rows rownum)
1521   {
1522     // Save the value of the current_row
1523     peer_tracker.check_if_next_group();
1524     cursor.on_next_partition(rownum);
1525     // Add the current row now because our cursor has already seen it
1526     add_value_to_items();
1527   }
1528 
next_partition(ha_rows rownum)1529   void next_partition(ha_rows rownum)
1530   {
1531     walk_till_non_peer();
1532   }
1533 
pre_next_row()1534   void pre_next_row()
1535   {
1536     dont_move= !peer_tracker.check_if_next_group();
1537   }
1538 
next_row()1539   void next_row()
1540   {
1541     // Check if our cursor is pointing at a peer of the current row.
1542     // If not, move forward until that becomes true
1543     if (dont_move)
1544     {
1545       /*
1546         Our current is not a peer of the current row.
1547         No need to move the bound.
1548       */
1549       return;
1550     }
1551     walk_till_non_peer();
1552   }
1553 
get_curr_rownum() const1554   ha_rows get_curr_rownum() const
1555   {
1556     return cursor.get_rownum();
1557   }
1558 
1559 private:
walk_till_non_peer()1560   void walk_till_non_peer()
1561   {
1562     /*
1563       Walk forward until we've met first row that's not a peer of the current
1564       row
1565     */
1566     while (!cursor.next())
1567     {
1568       if (peer_tracker.compare_with_cache())
1569       {
1570         cursor.prev(); // Move to our peer.
1571         break;
1572       }
1573 
1574       add_value_to_items();
1575     }
1576   }
1577 };
1578 
1579 
1580 /*
1581   RANGE BETWEEN CURRENT ROW AND .... Top CURRENT ROW, RANGE-type frame bound
1582 
1583       nonpeer1
1584       nonpeer2
1585     +-peer1  <----- the cursor points here. peer1 itself is included.
1586     | peer2
1587     | peer3  <----- current_row
1588     | peer4
1589       ...
1590 
1591   It moves behind the current_row. It is located right after the first peer of
1592   the current_row.
1593 */
1594 
1595 class Frame_range_current_row_top : public Frame_cursor
1596 {
1597   Group_bound_tracker bound_tracker;
1598 
1599   Table_read_cursor cursor;
1600   Group_bound_tracker peer_tracker;
1601 
1602   bool move;
1603 public:
Frame_range_current_row_top(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1604   Frame_range_current_row_top(THD *thd,
1605                               SQL_I_List<ORDER> *partition_list,
1606                               SQL_I_List<ORDER> *order_list) :
1607     bound_tracker(thd, partition_list), cursor(), peer_tracker(thd, order_list),
1608     move(false)
1609   {}
1610 
init(READ_RECORD * info)1611   void init(READ_RECORD *info)
1612   {
1613     bound_tracker.init();
1614 
1615     cursor.init(info);
1616     peer_tracker.init();
1617   }
1618 
pre_next_partition(ha_rows rownum)1619   void pre_next_partition(ha_rows rownum)
1620   {
1621     // Fetch the value from the first row
1622     peer_tracker.check_if_next_group();
1623     cursor.move_to(rownum);
1624   }
1625 
next_partition(ha_rows rownum)1626   void next_partition(ha_rows rownum) {}
1627 
pre_next_row()1628   void pre_next_row()
1629   {
1630     // Check if the new current_row is a peer of the row that our cursor is
1631     // pointing to.
1632     move= peer_tracker.check_if_next_group();
1633   }
1634 
next_row()1635   void next_row()
1636   {
1637     if (move)
1638     {
1639       /*
1640         Our cursor is pointing at the first row that was a peer of the previous
1641         current row. Or, it was the first row in the partition.
1642       */
1643       if (cursor.fetch())
1644         return;
1645 
1646       // todo: need the following check ?
1647       if (!peer_tracker.compare_with_cache())
1648         return;
1649       remove_value_from_items();
1650 
1651       do
1652       {
1653         if (cursor.next() || cursor.fetch())
1654           return;
1655         if (!peer_tracker.compare_with_cache())
1656           return;
1657         remove_value_from_items();
1658       }
1659       while (1);
1660     }
1661   }
1662 
get_curr_rownum() const1663   ha_rows get_curr_rownum() const
1664   {
1665     return cursor.get_rownum();
1666   }
1667 };
1668 
1669 
1670 /////////////////////////////////////////////////////////////////////////////
1671 // UNBOUNDED frame bounds (shared between RANGE and ROWS)
1672 /////////////////////////////////////////////////////////////////////////////
1673 
1674 /*
1675   UNBOUNDED PRECEDING frame bound
1676 */
1677 class Frame_unbounded_preceding : public Frame_cursor
1678 {
1679 public:
Frame_unbounded_preceding(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1680   Frame_unbounded_preceding(THD *thd,
1681                             SQL_I_List<ORDER> *partition_list,
1682                             SQL_I_List<ORDER> *order_list)
1683   {}
1684 
init(READ_RECORD * info)1685   void init(READ_RECORD *info) {}
1686 
next_partition(ha_rows rownum)1687   void next_partition(ha_rows rownum)
1688   {
1689     /*
1690       UNBOUNDED PRECEDING frame end just stays on the first row of the
1691       partition. We are top of the frame, so we don't need to update the sum
1692       function.
1693     */
1694     curr_rownum= rownum;
1695   }
1696 
next_row()1697   void next_row()
1698   {
1699     /* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */
1700   }
1701 
get_curr_rownum() const1702   ha_rows get_curr_rownum() const
1703   {
1704     return curr_rownum;
1705   }
1706 
1707 private:
1708   ha_rows curr_rownum;
1709 };
1710 
1711 
1712 /*
1713   UNBOUNDED FOLLOWING frame bound
1714 */
1715 
1716 class Frame_unbounded_following : public Frame_cursor
1717 {
1718 protected:
1719   Partition_read_cursor cursor;
1720 
1721 public:
Frame_unbounded_following(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1722   Frame_unbounded_following(THD *thd,
1723       SQL_I_List<ORDER> *partition_list,
1724       SQL_I_List<ORDER> *order_list) :
1725     cursor(thd, partition_list) {}
1726 
init(READ_RECORD * info)1727   void init(READ_RECORD *info)
1728   {
1729     cursor.init(info);
1730   }
1731 
pre_next_partition(ha_rows rownum)1732   void pre_next_partition(ha_rows rownum)
1733   {
1734     cursor.on_next_partition(rownum);
1735   }
1736 
next_partition(ha_rows rownum)1737   void next_partition(ha_rows rownum)
1738   {
1739     /* Activate the first row */
1740     cursor.fetch();
1741     add_value_to_items();
1742 
1743     /* Walk to the end of the partition, updating the SUM function */
1744     while (!cursor.next())
1745     {
1746       add_value_to_items();
1747     }
1748   }
1749 
next_row()1750   void next_row()
1751   {
1752     /* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */
1753   }
1754 
get_curr_rownum() const1755   ha_rows get_curr_rownum() const
1756   {
1757     return cursor.get_rownum();
1758   }
1759 };
1760 
1761 
1762 class Frame_unbounded_following_set_count : public Frame_unbounded_following
1763 {
1764 public:
Frame_unbounded_following_set_count(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1765   Frame_unbounded_following_set_count(
1766       THD *thd,
1767       SQL_I_List<ORDER> *partition_list, SQL_I_List<ORDER> *order_list) :
1768     Frame_unbounded_following(thd, partition_list, order_list) {}
1769 
next_partition(ha_rows rownum)1770   void next_partition(ha_rows rownum)
1771   {
1772     ha_rows num_rows_in_partition= 0;
1773     if (cursor.fetch())
1774       return;
1775     num_rows_in_partition++;
1776 
1777     /* Walk to the end of the partition, find how many rows there are. */
1778     while (!cursor.next())
1779       num_rows_in_partition++;
1780     set_win_funcs_row_count(num_rows_in_partition);
1781   }
1782 
get_curr_rownum() const1783   ha_rows get_curr_rownum() const
1784   {
1785     return cursor.get_rownum();
1786   }
1787 
1788 protected:
set_win_funcs_row_count(ha_rows num_rows_in_partition)1789   void set_win_funcs_row_count(ha_rows num_rows_in_partition)
1790   {
1791     List_iterator_fast<Item_sum> it(sum_functions);
1792     Item_sum* item;
1793     while ((item= it++))
1794       item->set_partition_row_count(num_rows_in_partition);
1795   }
1796 };
1797 
1798 class Frame_unbounded_following_set_count_no_nulls:
1799             public Frame_unbounded_following_set_count
1800 {
1801 
1802 public:
Frame_unbounded_following_set_count_no_nulls(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1803   Frame_unbounded_following_set_count_no_nulls(THD *thd,
1804       SQL_I_List<ORDER> *partition_list,
1805       SQL_I_List<ORDER> *order_list) :
1806   Frame_unbounded_following_set_count(thd,partition_list, order_list)
1807   {
1808     order_item= order_list->first->item[0];
1809   }
next_partition(ha_rows rownum)1810   void next_partition(ha_rows rownum)
1811   {
1812     ha_rows num_rows_in_partition= 0;
1813     if (cursor.fetch())
1814       return;
1815 
1816     /* Walk to the end of the partition, find how many rows there are. */
1817     do
1818     {
1819       if (!order_item->is_null())
1820         num_rows_in_partition++;
1821     } while (!cursor.next());
1822 
1823     set_win_funcs_row_count(num_rows_in_partition);
1824   }
1825 
get_curr_rownum() const1826   ha_rows get_curr_rownum() const
1827   {
1828     return cursor.get_rownum();
1829   }
1830 
1831 private:
1832   Item* order_item;
1833 };
1834 
1835 /////////////////////////////////////////////////////////////////////////////
1836 // ROWS-type frame bounds
1837 /////////////////////////////////////////////////////////////////////////////
1838 /*
1839   ROWS $n PRECEDING frame bound
1840 
1841 */
1842 class Frame_n_rows_preceding : public Frame_cursor
1843 {
1844   /* Whether this is top of the frame or bottom */
1845   const bool is_top_bound;
1846   const ha_rows n_rows;
1847 
1848   /* Number of rows that we need to skip before our cursor starts moving */
1849   ha_rows n_rows_behind;
1850 
1851   Table_read_cursor cursor;
1852 public:
Frame_n_rows_preceding(bool is_top_bound_arg,ha_rows n_rows_arg)1853   Frame_n_rows_preceding(bool is_top_bound_arg, ha_rows n_rows_arg) :
1854     is_top_bound(is_top_bound_arg), n_rows(n_rows_arg), n_rows_behind(0)
1855   {}
1856 
init(READ_RECORD * info)1857   void init(READ_RECORD *info)
1858   {
1859     cursor.init(info);
1860   }
1861 
next_partition(ha_rows rownum)1862   void next_partition(ha_rows rownum)
1863   {
1864     /*
1865       Position our cursor to point at the first row in the new partition
1866       (for rownum=0, it is already there, otherwise, it lags behind)
1867     */
1868     cursor.move_to(rownum);
1869     /* Cursor is in the same spot as current row. */
1870     n_rows_behind= 0;
1871 
1872     /*
1873       Suppose the bound is ROWS 2 PRECEDING, and current row is row#n:
1874         ...
1875         n-3
1876         n-2 --- bound row
1877         n-1
1878          n  --- current_row
1879         ...
1880        The bound should point at row #(n-2). Bounds are inclusive, so
1881         - bottom bound should add row #(n-2) into the window function
1882         - top bound should remove row (#n-3) from the window function.
1883     */
1884     move_cursor_if_possible();
1885 
1886   }
1887 
next_row()1888   void next_row()
1889   {
1890     n_rows_behind++;
1891     move_cursor_if_possible();
1892   }
1893 
is_outside_computation_bounds() const1894   bool is_outside_computation_bounds() const
1895   {
1896     /* As a bottom boundary, rows have not yet been added. */
1897     if (!is_top_bound && n_rows - n_rows_behind)
1898       return true;
1899     return false;
1900   }
1901 
get_curr_rownum() const1902   ha_rows get_curr_rownum() const
1903   {
1904     return cursor.get_rownum();
1905   }
1906 
1907 private:
move_cursor_if_possible()1908   void move_cursor_if_possible()
1909   {
1910     longlong rows_difference= n_rows - n_rows_behind;
1911     if (rows_difference > 0) /* We still have to wait. */
1912       return;
1913 
1914       /* The cursor points to the first row in the frame. */
1915     if (rows_difference == 0)
1916     {
1917       if (!is_top_bound)
1918       {
1919         cursor.fetch();
1920         add_value_to_items();
1921       }
1922       /* For top bound we don't have to remove anything as nothing was added. */
1923       return;
1924     }
1925 
1926     /* We need to catch up by one row. */
1927     DBUG_ASSERT(rows_difference == -1);
1928 
1929     if (is_top_bound)
1930     {
1931       cursor.fetch();
1932       remove_value_from_items();
1933       cursor.next();
1934     }
1935     else
1936     {
1937       cursor.next();
1938       cursor.fetch();
1939       add_value_to_items();
1940     }
1941     /* We've advanced one row. We are no longer behind. */
1942     n_rows_behind--;
1943   }
1944 };
1945 
1946 
1947 /*
1948   ROWS ... CURRENT ROW, Bottom bound.
1949 
1950   This case is moved to separate class because here we don't need to maintain
1951   our own cursor, or check for partition bound.
1952 */
1953 
1954 class Frame_rows_current_row_bottom : public Frame_cursor
1955 {
1956 public:
1957 
Frame_rows_current_row_bottom()1958   Frame_rows_current_row_bottom() : curr_rownum(0) {}
1959 
pre_next_partition(ha_rows rownum)1960   void pre_next_partition(ha_rows rownum)
1961   {
1962     add_value_to_items();
1963     curr_rownum= rownum;
1964   }
1965 
next_partition(ha_rows rownum)1966   void next_partition(ha_rows rownum) {}
1967 
pre_next_row()1968   void pre_next_row()
1969   {
1970     /* Temp table's current row is current_row. Add it to the window func */
1971     add_value_to_items();
1972   }
1973 
next_row()1974   void next_row()
1975   {
1976     curr_rownum++;
1977   };
1978 
get_curr_rownum() const1979   ha_rows get_curr_rownum() const
1980   {
1981     return curr_rownum;
1982   }
1983 
1984 private:
1985   ha_rows curr_rownum;
1986 };
1987 
1988 
1989 /*
1990   ROWS-type CURRENT ROW, top bound.
1991 
1992   This serves for processing "ROWS BETWEEN CURRENT ROW AND ..." frames.
1993 
1994       n-1
1995        n  --+  --- current_row, and top frame bound
1996       n+1   |
1997       ...   |
1998 
1999   when the current_row moves to row #n, this frame bound should remove the
2000   row #(n-1) from the window function.
2001 
2002   In other words, we need what "ROWS PRECEDING 0" provides.
2003 */
2004 class Frame_rows_current_row_top: public Frame_n_rows_preceding
2005 
2006 {
2007 public:
Frame_rows_current_row_top()2008   Frame_rows_current_row_top() :
2009     Frame_n_rows_preceding(true /*top*/, 0 /* n_rows */)
2010   {}
2011 };
2012 
2013 
2014 /*
2015   ROWS $n FOLLOWING frame bound.
2016 */
2017 
2018 class Frame_n_rows_following : public Frame_cursor
2019 {
2020   /* Whether this is top of the frame or bottom */
2021   const bool is_top_bound;
2022   const ha_rows n_rows;
2023 
2024   Partition_read_cursor cursor;
2025   bool at_partition_end;
2026 public:
Frame_n_rows_following(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list,bool is_top_bound_arg,ha_rows n_rows_arg)2027   Frame_n_rows_following(THD *thd,
2028             SQL_I_List<ORDER> *partition_list,
2029             SQL_I_List<ORDER> *order_list,
2030             bool is_top_bound_arg, ha_rows n_rows_arg) :
2031     is_top_bound(is_top_bound_arg), n_rows(n_rows_arg),
2032     cursor(thd, partition_list)
2033   {
2034   }
2035 
init(READ_RECORD * info)2036   void init(READ_RECORD *info)
2037   {
2038     cursor.init(info);
2039     at_partition_end= false;
2040   }
2041 
pre_next_partition(ha_rows rownum)2042   void pre_next_partition(ha_rows rownum)
2043   {
2044     at_partition_end= false;
2045 
2046     cursor.on_next_partition(rownum);
2047   }
2048 
2049   /* Move our cursor to be n_rows ahead.  */
next_partition(ha_rows rownum)2050   void next_partition(ha_rows rownum)
2051   {
2052     if (is_top_bound)
2053       next_part_top(rownum);
2054     else
2055       next_part_bottom(rownum);
2056   }
2057 
next_row()2058   void next_row()
2059   {
2060     if (is_top_bound)
2061       next_row_top();
2062     else
2063       next_row_bottom();
2064   }
2065 
is_outside_computation_bounds() const2066   bool is_outside_computation_bounds() const
2067   {
2068     /*
2069        The top bound can go over the current partition. In this case,
2070        the sum function has 0 values added to it.
2071     */
2072     if (at_partition_end && is_top_bound)
2073       return true;
2074     return false;
2075   }
2076 
get_curr_rownum() const2077   ha_rows get_curr_rownum() const
2078   {
2079     return cursor.get_rownum();
2080   }
2081 
2082 private:
next_part_top(ha_rows rownum)2083   void next_part_top(ha_rows rownum)
2084   {
2085     for (ha_rows i= 0; i < n_rows; i++)
2086     {
2087       if (cursor.fetch())
2088         break;
2089       remove_value_from_items();
2090       if (cursor.next())
2091         at_partition_end= true;
2092     }
2093   }
2094 
next_part_bottom(ha_rows rownum)2095   void next_part_bottom(ha_rows rownum)
2096   {
2097     if (cursor.fetch())
2098       return;
2099     add_value_to_items();
2100 
2101     for (ha_rows i= 0; i < n_rows; i++)
2102     {
2103       if (cursor.next())
2104       {
2105         at_partition_end= true;
2106         break;
2107       }
2108       add_value_to_items();
2109     }
2110     return;
2111   }
2112 
next_row_top()2113   void next_row_top()
2114   {
2115     if (cursor.fetch()) // PART END OR FAILURE
2116     {
2117       at_partition_end= true;
2118       return;
2119     }
2120     remove_value_from_items();
2121     if (cursor.next())
2122     {
2123       at_partition_end= true;
2124       return;
2125     }
2126   }
2127 
next_row_bottom()2128   void next_row_bottom()
2129   {
2130     if (at_partition_end)
2131       return;
2132 
2133     if (cursor.next())
2134     {
2135       at_partition_end= true;
2136       return;
2137     }
2138 
2139     add_value_to_items();
2140 
2141   }
2142 };
2143 
2144 /*
2145   A cursor that performs a table scan between two indices. The indices
2146   are provided by the two cursors representing the top and bottom bound
2147   of the window function's frame definition.
2148 
2149   Each scan clears the sum function.
2150 
2151   NOTE:
2152     The cursor does not alter the top and bottom cursors.
2153     This type of cursor is expensive computational wise. This is only to be
2154     used when the sum functions do not support removal.
2155 */
2156 class Frame_scan_cursor : public Frame_cursor
2157 {
2158 public:
Frame_scan_cursor(const Frame_cursor & top_bound,const Frame_cursor & bottom_bound)2159   Frame_scan_cursor(const Frame_cursor &top_bound,
2160                     const Frame_cursor &bottom_bound) :
2161     top_bound(top_bound), bottom_bound(bottom_bound) {}
2162 
init(READ_RECORD * info)2163   void init(READ_RECORD *info)
2164   {
2165     cursor.init(info);
2166   }
2167 
pre_next_partition(ha_rows rownum)2168   void pre_next_partition(ha_rows rownum)
2169   {
2170     /* TODO(cvicentiu) Sum functions get cleared on next partition anyway during
2171        the window function computation algorithm. Either perform this only in
2172        cursors, or remove it from pre_next_partition.
2173     */
2174     curr_rownum= rownum;
2175     clear_sum_functions();
2176   }
2177 
next_partition(ha_rows rownum)2178   void next_partition(ha_rows rownum)
2179   {
2180     compute_values_for_current_row();
2181   }
2182 
pre_next_row()2183   void pre_next_row()
2184   {
2185     clear_sum_functions();
2186   }
2187 
next_row()2188   void next_row()
2189   {
2190     curr_rownum++;
2191     compute_values_for_current_row();
2192   }
2193 
get_curr_rownum() const2194   ha_rows get_curr_rownum() const
2195   {
2196     return curr_rownum;
2197   }
2198 
2199 private:
2200   const Frame_cursor &top_bound;
2201   const Frame_cursor &bottom_bound;
2202   Table_read_cursor cursor;
2203   ha_rows curr_rownum;
2204 
2205   /* Scan the rows between the top bound and bottom bound. Add all the values
2206      between them, top bound row  and bottom bound row inclusive. */
compute_values_for_current_row()2207   void compute_values_for_current_row()
2208   {
2209     if (top_bound.is_outside_computation_bounds() ||
2210         bottom_bound.is_outside_computation_bounds())
2211       return;
2212 
2213     ha_rows start_rownum= top_bound.get_curr_rownum();
2214     ha_rows bottom_rownum= bottom_bound.get_curr_rownum();
2215     DBUG_PRINT("info", ("COMPUTING (%llu %llu)", start_rownum, bottom_rownum));
2216 
2217     cursor.move_to(start_rownum);
2218 
2219     for (ha_rows idx= start_rownum; idx <= bottom_rownum; idx++)
2220     {
2221       if (cursor.fetch()) //EOF
2222         break;
2223       add_value_to_items();
2224       if (cursor.next()) // EOF
2225         break;
2226     }
2227   }
2228 };
2229 
2230 /* A cursor that follows a target cursor. Each time a new row is added,
2231    the window functions are cleared and only have the row at which the target
2232    is point at added to them.
2233 
2234    The window functions are cleared if the bounds or the position cursors are
2235    outside computational bounds.
2236 */
2237 class Frame_positional_cursor : public Frame_cursor
2238 {
2239  public:
Frame_positional_cursor(const Frame_cursor & position_cursor)2240   Frame_positional_cursor(const Frame_cursor &position_cursor) :
2241     position_cursor(position_cursor), top_bound(NULL),
2242     bottom_bound(NULL), offset(NULL), overflowed(false),
2243     negative_offset(false) {}
2244 
Frame_positional_cursor(const Frame_cursor & position_cursor,const Frame_cursor & top_bound,const Frame_cursor & bottom_bound,Item & offset,bool negative_offset)2245   Frame_positional_cursor(const Frame_cursor &position_cursor,
2246                           const Frame_cursor &top_bound,
2247                           const Frame_cursor &bottom_bound,
2248                           Item &offset,
2249                           bool negative_offset) :
2250     position_cursor(position_cursor), top_bound(&top_bound),
2251     bottom_bound(&bottom_bound), offset(&offset),
2252     negative_offset(negative_offset) {}
2253 
init(READ_RECORD * info)2254   void init(READ_RECORD *info)
2255   {
2256     cursor.init(info);
2257   }
2258 
pre_next_partition(ha_rows rownum)2259   void pre_next_partition(ha_rows rownum)
2260   {
2261     /* The offset is dependant on the current row values. We can only get
2262      * it here accurately. When fetching other rows, it changes. */
2263     save_offset_value();
2264   }
2265 
next_partition(ha_rows rownum)2266   void next_partition(ha_rows rownum)
2267   {
2268     save_positional_value();
2269   }
2270 
pre_next_row()2271   void pre_next_row()
2272   {
2273     /* The offset is dependant on the current row values. We can only get
2274      * it here accurately. When fetching other rows, it changes. */
2275     save_offset_value();
2276   }
2277 
next_row()2278   void next_row()
2279   {
2280     save_positional_value();
2281   }
2282 
get_curr_rownum() const2283   ha_rows get_curr_rownum() const
2284   {
2285     return position_cursor.get_curr_rownum();
2286   }
2287 
2288 private:
2289   /* Check if a our position is within bounds.
2290    * The position is passed as a parameter to avoid recalculating it. */
position_is_within_bounds()2291   bool position_is_within_bounds()
2292   {
2293     if (!offset)
2294       return !position_cursor.is_outside_computation_bounds();
2295 
2296     if (overflowed)
2297       return false;
2298 
2299     /* No valid bound to compare to. */
2300     if (position_cursor.is_outside_computation_bounds() ||
2301         top_bound->is_outside_computation_bounds() ||
2302         bottom_bound->is_outside_computation_bounds())
2303       return false;
2304 
2305     /* We are over the bound. */
2306     if (position < top_bound->get_curr_rownum())
2307       return false;
2308     if (position > bottom_bound->get_curr_rownum())
2309       return false;
2310 
2311     return true;
2312   }
2313 
2314   /* Get the current position, accounting for the offset value, if present.
2315      NOTE: This function does not check over/underflow.
2316   */
get_current_position()2317   void get_current_position()
2318   {
2319     position = position_cursor.get_curr_rownum();
2320     overflowed= false;
2321     if (offset)
2322     {
2323       if (offset_value < 0 &&
2324           position + offset_value > position)
2325       {
2326         overflowed= true;
2327       }
2328       if (offset_value > 0 &&
2329           position + offset_value < position)
2330       {
2331         overflowed= true;
2332       }
2333       position += offset_value;
2334     }
2335   }
2336 
save_offset_value()2337   void save_offset_value()
2338   {
2339     if (offset)
2340       offset_value= offset->val_int() * (negative_offset ? -1 : 1);
2341     else
2342       offset_value= 0;
2343   }
2344 
save_positional_value()2345   void save_positional_value()
2346   {
2347     get_current_position();
2348     if (!position_is_within_bounds())
2349       clear_sum_functions();
2350     else
2351     {
2352       cursor.move_to(position);
2353       cursor.fetch();
2354       add_value_to_items();
2355     }
2356   }
2357 
2358   const Frame_cursor &position_cursor;
2359   const Frame_cursor *top_bound;
2360   const Frame_cursor *bottom_bound;
2361   Item *offset;
2362   Table_read_cursor cursor;
2363   ha_rows position;
2364   longlong offset_value;
2365   bool overflowed;
2366 
2367   bool negative_offset;
2368 };
2369 
2370 
2371 /*
2372   Get a Frame_cursor for a frame bound. This is a "factory function".
2373 */
get_frame_cursor(THD * thd,Window_spec * spec,bool is_top_bound)2374 Frame_cursor *get_frame_cursor(THD *thd, Window_spec *spec, bool is_top_bound)
2375 {
2376   Window_frame *frame= spec->window_frame;
2377   if (!frame)
2378   {
2379     /*
2380       The docs say this about the lack of frame clause:
2381 
2382         Let WD be a window structure descriptor.
2383         ...
2384         If WD has no window framing clause, then
2385         Case:
2386         i) If the window ordering clause of WD is not present, then WF is the
2387            window partition of R.
2388         ii) Otherwise, WF consists of all rows of the partition of R that
2389            precede R or are peers of R in the window ordering of the window
2390            partition defined by the window ordering clause.
2391 
2392         For case #ii, the frame bounds essentially are "RANGE BETWEEN UNBOUNDED
2393         PRECEDING AND CURRENT ROW".
2394         For the case #i, without ordering clause all rows are considered peers,
2395         so again the same frame bounds can be used.
2396     */
2397     if (is_top_bound)
2398       return new Frame_unbounded_preceding(thd,
2399                                            spec->partition_list,
2400                                            spec->order_list);
2401     else
2402       return new Frame_range_current_row_bottom(thd,
2403                                                 spec->partition_list,
2404                                                 spec->order_list);
2405   }
2406 
2407   Window_frame_bound *bound= is_top_bound? frame->top_bound :
2408                                            frame->bottom_bound;
2409 
2410   if (bound->precedence_type == Window_frame_bound::PRECEDING ||
2411       bound->precedence_type == Window_frame_bound::FOLLOWING)
2412   {
2413     bool is_preceding= (bound->precedence_type ==
2414                         Window_frame_bound::PRECEDING);
2415 
2416     if (bound->offset == NULL) /* this is UNBOUNDED */
2417     {
2418       /* The following serve both RANGE and ROWS: */
2419       if (is_preceding)
2420         return new Frame_unbounded_preceding(thd,
2421                                              spec->partition_list,
2422                                              spec->order_list);
2423 
2424       return new Frame_unbounded_following(thd,
2425                                            spec->partition_list,
2426                                            spec->order_list);
2427     }
2428 
2429     if (frame->units == Window_frame::UNITS_ROWS)
2430     {
2431       ha_rows n_rows= bound->offset->val_int();
2432       /* These should be handled in the parser */
2433       DBUG_ASSERT(!bound->offset->null_value);
2434       DBUG_ASSERT((longlong) n_rows >= 0);
2435       if (is_preceding)
2436         return new Frame_n_rows_preceding(is_top_bound, n_rows);
2437 
2438       return new Frame_n_rows_following(
2439           thd, spec->partition_list, spec->order_list,
2440           is_top_bound, n_rows);
2441     }
2442     else
2443     {
2444       if (is_top_bound)
2445         return new Frame_range_n_top(
2446             thd, spec->partition_list, spec->order_list,
2447             is_preceding, bound->offset);
2448 
2449       return new Frame_range_n_bottom(thd,
2450           spec->partition_list, spec->order_list,
2451           is_preceding, bound->offset);
2452     }
2453   }
2454 
2455   if (bound->precedence_type == Window_frame_bound::CURRENT)
2456   {
2457     if (frame->units == Window_frame::UNITS_ROWS)
2458     {
2459       if (is_top_bound)
2460         return new Frame_rows_current_row_top;
2461 
2462       return new Frame_rows_current_row_bottom;
2463     }
2464     else
2465     {
2466       if (is_top_bound)
2467         return new Frame_range_current_row_top(
2468             thd, spec->partition_list, spec->order_list);
2469 
2470       return new Frame_range_current_row_bottom(
2471           thd, spec->partition_list, spec->order_list);
2472     }
2473   }
2474   return NULL;
2475 }
2476 
2477 static
add_special_frame_cursors(THD * thd,Cursor_manager * cursor_manager,Item_window_func * window_func)2478 void add_special_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
2479                                Item_window_func *window_func)
2480 {
2481   Window_spec *spec= window_func->window_spec;
2482   Item_sum *item_sum= window_func->window_func();
2483   DBUG_PRINT("info", ("Get arg count: %d", item_sum->get_arg_count()));
2484   Frame_cursor *fc;
2485   switch (item_sum->sum_func())
2486   {
2487     case Item_sum::CUME_DIST_FUNC:
2488       fc= new Frame_unbounded_preceding(thd,
2489                                         spec->partition_list,
2490                                         spec->order_list);
2491       fc->add_sum_func(item_sum);
2492       cursor_manager->add_cursor(fc);
2493       fc= new Frame_range_current_row_bottom(thd,
2494                                              spec->partition_list,
2495                                              spec->order_list);
2496       fc->add_sum_func(item_sum);
2497       cursor_manager->add_cursor(fc);
2498       break;
2499     case Item_sum::LEAD_FUNC:
2500     case Item_sum::LAG_FUNC:
2501     {
2502       Frame_cursor *bottom_bound= new Frame_unbounded_following(thd,
2503                                                                 spec->partition_list,
2504                                                                 spec->order_list);
2505       Frame_cursor *top_bound= new Frame_unbounded_preceding(thd,
2506                                                              spec->partition_list,
2507                                                              spec->order_list);
2508       Frame_cursor *current_row_pos= new Frame_rows_current_row_bottom;
2509       cursor_manager->add_cursor(bottom_bound);
2510       cursor_manager->add_cursor(top_bound);
2511       cursor_manager->add_cursor(current_row_pos);
2512       DBUG_ASSERT(item_sum->fixed);
2513       bool negative_offset= item_sum->sum_func() == Item_sum::LAG_FUNC;
2514       fc= new Frame_positional_cursor(*current_row_pos,
2515                                       *top_bound, *bottom_bound,
2516                                       *item_sum->get_arg(1),
2517                                       negative_offset);
2518       fc->add_sum_func(item_sum);
2519       cursor_manager->add_cursor(fc);
2520       break;
2521     }
2522     case Item_sum::FIRST_VALUE_FUNC:
2523     {
2524       Frame_cursor *bottom_bound= get_frame_cursor(thd, spec, false);
2525       Frame_cursor *top_bound= get_frame_cursor(thd, spec, true);
2526       cursor_manager->add_cursor(bottom_bound);
2527       cursor_manager->add_cursor(top_bound);
2528       DBUG_ASSERT(item_sum->fixed);
2529       Item *offset_item= new (thd->mem_root) Item_int(thd, 0);
2530       offset_item->fix_fields(thd, &offset_item);
2531       fc= new Frame_positional_cursor(*top_bound,
2532                                       *top_bound, *bottom_bound,
2533                                       *offset_item, false);
2534       fc->add_sum_func(item_sum);
2535       cursor_manager->add_cursor(fc);
2536       break;
2537     }
2538     case Item_sum::LAST_VALUE_FUNC:
2539     {
2540       Frame_cursor *bottom_bound= get_frame_cursor(thd, spec, false);
2541       Frame_cursor *top_bound= get_frame_cursor(thd, spec, true);
2542       cursor_manager->add_cursor(bottom_bound);
2543       cursor_manager->add_cursor(top_bound);
2544       DBUG_ASSERT(item_sum->fixed);
2545       Item *offset_item= new (thd->mem_root) Item_int(thd, 0);
2546       offset_item->fix_fields(thd, &offset_item);
2547       fc= new Frame_positional_cursor(*bottom_bound,
2548                                       *top_bound, *bottom_bound,
2549                                       *offset_item, false);
2550       fc->add_sum_func(item_sum);
2551       cursor_manager->add_cursor(fc);
2552       break;
2553     }
2554     case Item_sum::NTH_VALUE_FUNC:
2555     {
2556       Frame_cursor *bottom_bound= get_frame_cursor(thd, spec, false);
2557       Frame_cursor *top_bound= get_frame_cursor(thd, spec, true);
2558       cursor_manager->add_cursor(bottom_bound);
2559       cursor_manager->add_cursor(top_bound);
2560       DBUG_ASSERT(item_sum->fixed);
2561       Item *int_item= new (thd->mem_root) Item_int(thd, 1);
2562       Item *offset_func= new (thd->mem_root)
2563                               Item_func_minus(thd, item_sum->get_arg(1),
2564                                               int_item);
2565       offset_func->fix_fields(thd, &offset_func);
2566       fc= new Frame_positional_cursor(*top_bound,
2567                                       *top_bound, *bottom_bound,
2568                                       *offset_func, false);
2569       fc->add_sum_func(item_sum);
2570       cursor_manager->add_cursor(fc);
2571       break;
2572     }
2573     case Item_sum::PERCENTILE_CONT_FUNC:
2574     case Item_sum::PERCENTILE_DISC_FUNC:
2575     {
2576       fc= new Frame_unbounded_preceding(thd,
2577                                         spec->partition_list,
2578                                         spec->order_list);
2579       fc->add_sum_func(item_sum);
2580       cursor_manager->add_cursor(fc);
2581       fc= new Frame_unbounded_following(thd,
2582                                           spec->partition_list,
2583                                           spec->order_list);
2584       fc->add_sum_func(item_sum);
2585       cursor_manager->add_cursor(fc);
2586       break;
2587     }
2588     default:
2589       fc= new Frame_unbounded_preceding(
2590               thd, spec->partition_list, spec->order_list);
2591       fc->add_sum_func(item_sum);
2592       cursor_manager->add_cursor(fc);
2593 
2594       fc= new Frame_rows_current_row_bottom;
2595       fc->add_sum_func(item_sum);
2596       cursor_manager->add_cursor(fc);
2597   }
2598 }
2599 
2600 
is_computed_with_remove(Item_sum::Sumfunctype sum_func)2601 static bool is_computed_with_remove(Item_sum::Sumfunctype sum_func)
2602 {
2603   switch (sum_func)
2604   {
2605     case Item_sum::CUME_DIST_FUNC:
2606     case Item_sum::ROW_NUMBER_FUNC:
2607     case Item_sum::RANK_FUNC:
2608     case Item_sum::DENSE_RANK_FUNC:
2609     case Item_sum::NTILE_FUNC:
2610     case Item_sum::FIRST_VALUE_FUNC:
2611     case Item_sum::LAST_VALUE_FUNC:
2612     case Item_sum::PERCENTILE_CONT_FUNC:
2613     case Item_sum::PERCENTILE_DISC_FUNC:
2614       return false;
2615     default:
2616       return true;
2617   }
2618 }
2619 /*
2620    Create required frame cursors for the list of window functions.
2621    Register all functions to their appropriate cursors.
2622    If the window functions share the same frame specification,
2623    those window functions will be registered to the same cursor.
2624 */
get_window_functions_required_cursors(THD * thd,List<Item_window_func> & window_functions,List<Cursor_manager> * cursor_managers)2625 void get_window_functions_required_cursors(
2626     THD *thd,
2627     List<Item_window_func>& window_functions,
2628     List<Cursor_manager> *cursor_managers)
2629 {
2630   List_iterator_fast<Item_window_func> it(window_functions);
2631   Item_window_func* item_win_func;
2632   Item_sum *sum_func;
2633   while ((item_win_func= it++))
2634   {
2635     Cursor_manager *cursor_manager = new Cursor_manager();
2636     sum_func = item_win_func->window_func();
2637     Frame_cursor *fc;
2638     /*
2639       Some window functions require the partition size for computing values.
2640       Add a cursor that retrieves it as the first one in the list if necessary.
2641     */
2642     if (item_win_func->requires_partition_size())
2643     {
2644       if (item_win_func->only_single_element_order_list())
2645       {
2646         fc= new Frame_unbounded_following_set_count_no_nulls(thd,
2647                 item_win_func->window_spec->partition_list,
2648                 item_win_func->window_spec->order_list);
2649       }
2650       else
2651       {
2652         fc= new Frame_unbounded_following_set_count(thd,
2653                 item_win_func->window_spec->partition_list,
2654                 item_win_func->window_spec->order_list);
2655       }
2656       fc->add_sum_func(sum_func);
2657       cursor_manager->add_cursor(fc);
2658     }
2659 
2660     /*
2661       If it is not a regular window function that follows frame specifications,
2662       and/or specific cursors are required. ROW_NUM, RANK, NTILE and others
2663       follow such rules. Check is_frame_prohibited check for the full list.
2664 
2665       TODO(cvicentiu) This approach is messy. Every time a function allows
2666       computation in a certain way, we have to add an extra method to this
2667       factory function. It is better to have window functions output
2668       their own cursors, as needed. This way, the logic is bound
2669       only to the implementation of said window function. Regular aggregate
2670       functions can keep the default frame generating code, overwrite it or
2671       add to it.
2672     */
2673     if (item_win_func->is_frame_prohibited() ||
2674         item_win_func->requires_special_cursors())
2675     {
2676       add_special_frame_cursors(thd, cursor_manager, item_win_func);
2677       cursor_managers->push_back(cursor_manager);
2678       continue;
2679     }
2680 
2681     Frame_cursor *frame_bottom= get_frame_cursor(thd,
2682         item_win_func->window_spec, false);
2683     Frame_cursor *frame_top= get_frame_cursor(thd,
2684         item_win_func->window_spec, true);
2685 
2686     frame_bottom->add_sum_func(sum_func);
2687     frame_top->add_sum_func(sum_func);
2688 
2689     /*
2690        The order of these cursors is important. A sum function
2691        must first add values (via frame_bottom) then remove them via
2692        frame_top. Removing items first doesn't make sense in the case of all
2693        window functions.
2694     */
2695     cursor_manager->add_cursor(frame_bottom);
2696     cursor_manager->add_cursor(frame_top);
2697     if (is_computed_with_remove(sum_func->sum_func()) &&
2698         !sum_func->supports_removal())
2699     {
2700       frame_bottom->set_no_action();
2701       frame_top->set_no_action();
2702       Frame_cursor *scan_cursor= new Frame_scan_cursor(*frame_top,
2703                                                        *frame_bottom);
2704       scan_cursor->add_sum_func(sum_func);
2705       cursor_manager->add_cursor(scan_cursor);
2706 
2707     }
2708     cursor_managers->push_back(cursor_manager);
2709   }
2710 }
2711 
2712 /**
2713   Helper function that takes a list of window functions and writes
2714   their values in the current table record.
2715 */
2716 static
save_window_function_values(List<Item_window_func> & window_functions,TABLE * tbl,uchar * rowid_buf)2717 bool save_window_function_values(List<Item_window_func>& window_functions,
2718                                  TABLE *tbl, uchar *rowid_buf)
2719 {
2720   List_iterator_fast<Item_window_func> iter(window_functions);
2721   JOIN_TAB *join_tab= tbl->reginfo.join_tab;
2722   tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
2723   store_record(tbl, record[1]);
2724   while (Item_window_func *item_win= iter++)
2725     item_win->save_in_field(item_win->result_field, true);
2726 
2727   /*
2728     In case we have window functions present, an extra step is required
2729     to compute all the fields from the temporary table.
2730     In case we have a compound expression such as: expr + expr,
2731     where one of the terms has a window function inside it, only
2732     after computing window function values we actually know the true
2733     final result of the compounded expression.
2734 
2735     Go through all the func items and save their values once again in the
2736     corresponding temp table fields. Do this for each row in the table.
2737 
2738     This needs to be done earlier because ORDER BY clause can also have
2739     a window function, so we need to make sure all the fields of the temp.table
2740     are updated before we do the filesort. So is best to update the other fields
2741     that contain the window functions along with the computation of window
2742     functions.
2743   */
2744 
2745   Item **func_ptr= join_tab->tmp_table_param->items_to_copy;
2746   Item *func;
2747   for (; (func = *func_ptr) ; func_ptr++)
2748   {
2749     if (func->with_window_func && func->type() != Item::WINDOW_FUNC_ITEM)
2750       func->save_in_result_field(true);
2751   }
2752 
2753   int err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
2754   if (err && err != HA_ERR_RECORD_IS_THE_SAME)
2755     return true;
2756 
2757   return false;
2758 }
2759 
2760 /*
2761   TODO(cvicentiu) update this comment to reflect the new execution.
2762 
2763   Streamed window function computation with window frames.
2764 
2765   We make a single pass over the ordered temp.table, but we're using three
2766   cursors:
2767    - current row - the row that we're computing window func value for)
2768    - start_bound - the start of the frame
2769    - bottom_bound   - the end of the frame
2770 
2771   All three cursors move together.
2772 
2773   @todo
2774     Provided bounds have their 'cursors'... is it better to re-clone their
2775     cursors or re-position them onto the current row?
2776 
2777   @detail
2778     ROWS BETWEEN 3 PRECEDING  -- frame start
2779               AND 3 FOLLOWING  -- frame end
2780 
2781                                     /------ frame end (aka BOTTOM)
2782     Dataset start                   |
2783      --------====*=======[*]========*========-------->> dataset end
2784                  |        \
2785                  |         +-------- current row
2786                  |
2787                  \-------- frame start ("TOP")
2788 
2789     - frame_end moves forward and adds rows into the aggregate function.
2790     - frame_start follows behind and removes rows from the aggregate function.
2791     - current_row is the row where the value of aggregate function is stored.
2792 
2793   @TODO:  Only the first cursor needs to check for run-out-of-partition
2794   condition (Others can catch up by counting rows?)
2795 
2796 */
compute_window_func(THD * thd,List<Item_window_func> & window_functions,List<Cursor_manager> & cursor_managers,TABLE * tbl,SORT_INFO * filesort_result)2797 bool compute_window_func(THD *thd,
2798                          List<Item_window_func>& window_functions,
2799                          List<Cursor_manager>& cursor_managers,
2800                          TABLE *tbl,
2801                          SORT_INFO *filesort_result)
2802 {
2803   List_iterator_fast<Item_window_func> iter_win_funcs(window_functions);
2804   List_iterator_fast<Cursor_manager> iter_cursor_managers(cursor_managers);
2805   uint err;
2806 
2807   READ_RECORD info;
2808 
2809   if (init_read_record(&info, current_thd, tbl, NULL/*select*/, filesort_result,
2810                        0, 1, FALSE))
2811     return true;
2812 
2813   Cursor_manager *cursor_manager;
2814   while ((cursor_manager= iter_cursor_managers++))
2815     cursor_manager->initialize_cursors(&info);
2816 
2817   /* One partition tracker for each window function. */
2818   List<Group_bound_tracker> partition_trackers;
2819   Item_window_func *win_func;
2820   while ((win_func= iter_win_funcs++))
2821   {
2822     Group_bound_tracker *tracker= new Group_bound_tracker(thd,
2823                                         win_func->window_spec->partition_list);
2824     // TODO(cvicentiu) This should be removed and placed in constructor.
2825     tracker->init();
2826     partition_trackers.push_back(tracker);
2827   }
2828 
2829   List_iterator_fast<Group_bound_tracker> iter_part_trackers(partition_trackers);
2830   ha_rows rownum= 0;
2831   uchar *rowid_buf= (uchar*) my_malloc(PSI_INSTRUMENT_ME, tbl->file->ref_length, MYF(0));
2832 
2833   while (true)
2834   {
2835     if ((err= info.read_record()))
2836       break; // End of file.
2837 
2838     /* Remember current row so that we can restore it before computing
2839        each window function. */
2840     tbl->file->position(tbl->record[0]);
2841     memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length);
2842 
2843     iter_win_funcs.rewind();
2844     iter_part_trackers.rewind();
2845     iter_cursor_managers.rewind();
2846 
2847     Group_bound_tracker *tracker;
2848     while ((win_func= iter_win_funcs++) &&
2849            (tracker= iter_part_trackers++) &&
2850            (cursor_manager= iter_cursor_managers++))
2851     {
2852       if (tracker->check_if_next_group() || (rownum == 0))
2853       {
2854         /* TODO(cvicentiu)
2855            Clearing window functions should happen through cursors. */
2856         win_func->window_func()->clear();
2857         cursor_manager->notify_cursors_partition_changed(rownum);
2858       }
2859       else
2860       {
2861         cursor_manager->notify_cursors_next_row();
2862       }
2863 
2864       /* Check if we found any error in the window function while adding values
2865          through cursors. */
2866       if (unlikely(thd->is_error() || thd->is_killed()))
2867         break;
2868 
2869 
2870       /* Return to current row after notifying cursors for each window
2871          function. */
2872       tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
2873     }
2874 
2875     /* We now have computed values for each window function. They can now
2876        be saved in the current row. */
2877     save_window_function_values(window_functions, tbl, rowid_buf);
2878 
2879     rownum++;
2880   }
2881 
2882   my_free(rowid_buf);
2883   partition_trackers.delete_elements();
2884   end_read_record(&info);
2885 
2886   return false;
2887 }
2888 
2889 /* Make a list that is a concation of two lists of ORDER elements */
2890 
concat_order_lists(MEM_ROOT * mem_root,ORDER * list1,ORDER * list2)2891 static ORDER* concat_order_lists(MEM_ROOT *mem_root, ORDER *list1, ORDER *list2)
2892 {
2893   if (!list1)
2894   {
2895     list1= list2;
2896     list2= NULL;
2897   }
2898 
2899   ORDER *res= NULL; // first element in the new list
2900   ORDER *prev= NULL; // last element in the new list
2901   ORDER *cur_list= list1; // this goes through list1, list2
2902   while (cur_list)
2903   {
2904     for (ORDER *cur= cur_list; cur; cur= cur->next)
2905     {
2906       ORDER *copy= (ORDER*)alloc_root(mem_root, sizeof(ORDER));
2907       memcpy(copy, cur, sizeof(ORDER));
2908       if (prev)
2909         prev->next= copy;
2910       prev= copy;
2911       if (!res)
2912         res= copy;
2913     }
2914 
2915     cur_list= (cur_list == list1)? list2: NULL;
2916   }
2917 
2918   if (prev)
2919     prev->next= NULL;
2920 
2921   return res;
2922 }
2923 
add_function_to_run(Item_window_func * win_func)2924 bool Window_func_runner::add_function_to_run(Item_window_func *win_func)
2925 {
2926 
2927   Item_sum *sum_func= win_func->window_func();
2928   sum_func->setup_window_func(current_thd, win_func->window_spec);
2929 
2930   Item_sum::Sumfunctype type= win_func->window_func()->sum_func();
2931 
2932   switch (type)
2933   {
2934     /* Distinct is not yet supported. */
2935     case Item_sum::GROUP_CONCAT_FUNC:
2936       my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2937                "GROUP_CONCAT() aggregate as window function");
2938       return true;
2939     case Item_sum::SUM_DISTINCT_FUNC:
2940       my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2941                "SUM(DISTINCT) aggregate as window function");
2942       return true;
2943     case Item_sum::AVG_DISTINCT_FUNC:
2944       my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2945                "AVG(DISTINCT) aggregate as window function");
2946       return true;
2947     case Item_sum::COUNT_DISTINCT_FUNC:
2948       my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2949                "COUNT(DISTINCT) aggregate as window function");
2950       return true;
2951     case Item_sum::JSON_ARRAYAGG_FUNC:
2952       my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2953                "JSON_ARRAYAGG() aggregate as window function");
2954       return true;
2955     case Item_sum::JSON_OBJECTAGG_FUNC:
2956       my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2957                "JSON_OBJECTAGG() aggregate as window function");
2958       return true;
2959     default:
2960       break;
2961   }
2962 
2963   return window_functions.push_back(win_func);
2964 }
2965 
2966 
2967 /*
2968   Compute the value of window function for all rows.
2969 */
exec(THD * thd,TABLE * tbl,SORT_INFO * filesort_result)2970 bool Window_func_runner::exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result)
2971 {
2972   List_iterator_fast<Item_window_func> it(window_functions);
2973   Item_window_func *win_func;
2974   while ((win_func= it++))
2975   {
2976     win_func->set_phase_to_computation();
2977     // TODO(cvicentiu) Setting the aggregator should probably be done during
2978     // setup of Window_funcs_sort.
2979     win_func->window_func()->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
2980   }
2981   it.rewind();
2982 
2983   List<Cursor_manager> cursor_managers;
2984   get_window_functions_required_cursors(thd, window_functions,
2985                                         &cursor_managers);
2986 
2987   /* Go through the sorted array and compute the window function */
2988   bool is_error= compute_window_func(thd,
2989                                      window_functions,
2990                                      cursor_managers,
2991                                      tbl, filesort_result);
2992   while ((win_func= it++))
2993   {
2994     win_func->set_phase_to_retrieval();
2995   }
2996 
2997   cursor_managers.delete_elements();
2998 
2999   return is_error;
3000 }
3001 
3002 
exec(JOIN * join,bool keep_filesort_result)3003 bool Window_funcs_sort::exec(JOIN *join, bool keep_filesort_result)
3004 {
3005   THD *thd= join->thd;
3006   JOIN_TAB *join_tab= join->join_tab + join->total_join_tab_cnt();
3007 
3008   /* Sort the table based on the most specific sorting criteria of
3009      the window functions. */
3010   if (create_sort_index(thd, join, join_tab, filesort))
3011     return true;
3012 
3013   TABLE *tbl= join_tab->table;
3014   SORT_INFO *filesort_result= join_tab->filesort_result;
3015 
3016   bool is_error= runner.exec(thd, tbl, filesort_result);
3017 
3018   if (!keep_filesort_result)
3019   {
3020     delete join_tab->filesort_result;
3021     join_tab->filesort_result= NULL;
3022   }
3023   return is_error;
3024 }
3025 
3026 
setup(THD * thd,SQL_SELECT * sel,List_iterator<Item_window_func> & it,JOIN_TAB * join_tab)3027 bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
3028                               List_iterator<Item_window_func> &it,
3029                               JOIN_TAB *join_tab)
3030 {
3031   Window_spec *spec;
3032   Item_window_func *win_func= it.peek();
3033   Item_window_func *win_func_with_longest_order= NULL;
3034   int longest_order_elements= -1;
3035 
3036   /* The iterator should point to a valid function at the start of execution. */
3037   DBUG_ASSERT(win_func);
3038   do
3039   {
3040     spec= win_func->window_spec;
3041     int win_func_order_elements= spec->partition_list->elements +
3042                                   spec->order_list->elements;
3043     if (win_func_order_elements > longest_order_elements)
3044     {
3045       win_func_with_longest_order= win_func;
3046       longest_order_elements= win_func_order_elements;
3047     }
3048     if (runner.add_function_to_run(win_func))
3049       return true;
3050     it++;
3051     win_func= it.peek();
3052   } while (win_func && !(win_func->marker & SORTORDER_CHANGE_FLAG));
3053 
3054   /*
3055     The sort criteria must be taken from the last win_func in the group of
3056     adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG. This is
3057     because the sort order must be the most specific sorting criteria defined
3058     within the window function group. This ensures that we sort the table
3059     in a way that the result is valid for all window functions belonging to
3060     this Window_funcs_sort.
3061   */
3062   spec= win_func_with_longest_order->window_spec;
3063 
3064   ORDER* sort_order= concat_order_lists(thd->mem_root,
3065                                         spec->partition_list->first,
3066                                         spec->order_list->first);
3067   if (sort_order == NULL) // No partition or order by clause.
3068   {
3069     /* TODO(cvicentiu) This is used as a way to allow an empty OVER ()
3070        clause for window functions. However, a better approach is
3071        to not call Filesort at all in this case and just read whatever order
3072        the temporary table has.
3073        Due to cursors not working for out_of_memory cases (yet!), we have to run
3074        filesort to generate a sort buffer of the results.
3075        In this case we sort by the first field of the temporary table.
3076        We should have this field available, even if it is a window_function
3077        field. We don't care of the particular sorting result in this case.
3078      */
3079     ORDER *order= (ORDER *)alloc_root(thd->mem_root, sizeof(ORDER));
3080     memset(order, 0, sizeof(*order));
3081     Item *item= new (thd->mem_root) Item_temptable_field(thd,
3082                                                     join_tab->table->field[0]);
3083     order->item= (Item **)alloc_root(thd->mem_root, 2 * sizeof(Item *));
3084     order->item[1]= NULL;
3085     order->item[0]= item;
3086     order->field= join_tab->table->field[0];
3087     sort_order= order;
3088   }
3089   filesort= new (thd->mem_root) Filesort(sort_order, HA_POS_ERROR, true, NULL);
3090 
3091   /* Apply the same condition that the subsequent sort has. */
3092   filesort->select= sel;
3093 
3094   return false;
3095 }
3096 
3097 
setup(THD * thd,List<Item_window_func> * window_funcs,JOIN_TAB * tab)3098 bool Window_funcs_computation::setup(THD *thd,
3099                                      List<Item_window_func> *window_funcs,
3100                                      JOIN_TAB *tab)
3101 {
3102   order_window_funcs_by_window_specs(window_funcs);
3103 
3104   SQL_SELECT *sel= NULL;
3105   /*
3106      If the tmp table is filtered during sorting
3107      (ex: SELECT with HAVING && ORDER BY), we must make sure to keep the
3108      filtering conditions when we perform sorting for window function
3109      computation.
3110   */
3111   if (tab->filesort && tab->filesort->select)
3112   {
3113     sel= tab->filesort->select;
3114     DBUG_ASSERT(!sel->quick);
3115   }
3116 
3117   Window_funcs_sort *srt;
3118   List_iterator<Item_window_func> iter(*window_funcs);
3119   while (iter.peek())
3120   {
3121     if (!(srt= new Window_funcs_sort()) ||
3122         srt->setup(thd, sel, iter, tab))
3123     {
3124       return true;
3125     }
3126     win_func_sorts.push_back(srt, thd->mem_root);
3127   }
3128   return false;
3129 }
3130 
3131 
exec(JOIN * join,bool keep_last_filesort_result)3132 bool Window_funcs_computation::exec(JOIN *join, bool keep_last_filesort_result)
3133 {
3134   List_iterator<Window_funcs_sort> it(win_func_sorts);
3135   Window_funcs_sort *srt;
3136   uint counter= 0; /* Count how many sorts we've executed. */
3137   /* Execute each sort */
3138   while ((srt = it++))
3139   {
3140     counter++;
3141     bool keep_filesort_result= keep_last_filesort_result &&
3142                                counter == win_func_sorts.elements;
3143     if (srt->exec(join, keep_filesort_result))
3144       return true;
3145   }
3146   return false;
3147 }
3148 
3149 
cleanup()3150 void Window_funcs_computation::cleanup()
3151 {
3152   List_iterator<Window_funcs_sort> it(win_func_sorts);
3153   Window_funcs_sort *srt;
3154   while ((srt = it++))
3155   {
3156     srt->cleanup();
3157     delete srt;
3158   }
3159 }
3160 
3161 
3162 Explain_aggr_window_funcs*
save_explain_plan(MEM_ROOT * mem_root,bool is_analyze)3163 Window_funcs_computation::save_explain_plan(MEM_ROOT *mem_root,
3164                                                  bool is_analyze)
3165 {
3166   Explain_aggr_window_funcs *xpl= new Explain_aggr_window_funcs;
3167   List_iterator<Window_funcs_sort> it(win_func_sorts);
3168   Window_funcs_sort *srt;
3169   if (!xpl)
3170     return 0;
3171   while ((srt = it++))
3172   {
3173     Explain_aggr_filesort *eaf=
3174       new Explain_aggr_filesort(mem_root, is_analyze, srt->filesort);
3175     if (!eaf)
3176       return 0;
3177     xpl->sorts.push_back(eaf, mem_root);
3178   }
3179   return xpl;
3180 }
3181 
3182 
add_window_func(Item_window_func * win_func)3183 bool st_select_lex::add_window_func(Item_window_func *win_func)
3184 {
3185   if (parsing_place != SELECT_LIST)
3186     fields_in_window_functions+= win_func->window_func()->argument_count();
3187   return window_funcs.push_back(win_func);
3188 }
3189 
3190 /////////////////////////////////////////////////////////////////////////////
3191 // Unneeded comments (will be removed when we develop a replacement for
3192 //  the feature that was attempted here
3193 /////////////////////////////////////////////////////////////////////////////
3194   /*
3195    TODO Get this code to set can_compute_window_function during preparation,
3196    not during execution.
3197 
3198    The reason for this is the following:
3199    Our single scan optimization for window functions without tmp table,
3200    is valid, if and only if, we only need to perform one sorting operation,
3201    via filesort. The cases where we need to perform one sorting operation only:
3202 
3203    * A select with only one window function.
3204    * A select with multiple window functions, but they must have their
3205      partition and order by clauses compatible. This means that one ordering
3206      is acceptable for both window functions.
3207 
3208        For example:
3209        partition by a, b, c; order by d, e    results in sorting by a b c d e.
3210        partition by a; order by d             results in sorting by a d.
3211 
3212        This kind of sorting is compatible. The less specific partition does
3213        not care for the order of b and c columns so it is valid if we sort
3214        by those in case of equality over a.
3215 
3216        partition by a, b; order by d, e      results in sorting by a b d e
3217        partition by a; order by e            results in sorting by a e
3218 
3219       This sorting is incompatible due to the order by clause. The partition by
3220       clause is compatible, (partition by a) is a prefix for (partition by a, b)
3221       However, order by e is not a prefix for order by d, e, thus it is not
3222       compatible.
3223 
3224     The rule for having compatible sorting is thus:
3225       Each partition order must contain the other window functions partitions
3226       prefixes, or be a prefix itself. This must hold true for all partitions.
3227       Analog for the order by clause.
3228   */
3229 #if 0
3230   List<Item_window_func> window_functions;
3231   SQL_I_List<ORDER> largest_partition;
3232   SQL_I_List<ORDER> largest_order_by;
3233   bool can_compute_window_live = !need_tmp;
3234   // Construct the window_functions item list and check if they can be
3235   // computed using only one sorting.
3236   //
3237   // TODO: Perhaps group functions into compatible sorting bins
3238   // to minimize the number of sorting passes required to compute all of them.
3239   while ((item= it++))
3240   {
3241     if (item->type() == Item::WINDOW_FUNC_ITEM)
3242     {
3243       Item_window_func *item_win = (Item_window_func *) item;
3244       window_functions.push_back(item_win);
3245       if (!can_compute_window_live)
3246         continue;  // No point checking  since we have to perform multiple sorts.
3247       Window_spec *spec = item_win->window_spec;
3248       // Having an empty partition list on one window function and a
3249       // not empty list on a separate window function causes the sorting
3250       // to be incompatible.
3251       //
3252       // Example:
3253       // over (partition by a, order by x) && over (order by x).
3254       //
3255       // The first function requires an ordering by a first and then by x,
3256       // while the seond function requires an ordering by x first.
3257       // The same restriction is not required for the order by clause.
3258       if (largest_partition.elements && !spec->partition_list.elements)
3259       {
3260         can_compute_window_live= FALSE;
3261         continue;
3262       }
3263       can_compute_window_live= test_if_order_compatible(largest_partition,
3264                                                         spec->partition_list);
3265       if (!can_compute_window_live)
3266         continue;
3267 
3268       can_compute_window_live= test_if_order_compatible(largest_order_by,
3269                                                         spec->order_list);
3270       if (!can_compute_window_live)
3271         continue;
3272 
3273       if (largest_partition.elements < spec->partition_list.elements)
3274         largest_partition = spec->partition_list;
3275       if (largest_order_by.elements < spec->order_list.elements)
3276         largest_order_by = spec->order_list;
3277     }
3278   }
3279   if (can_compute_window_live && window_functions.elements && table_count == 1)
3280   {
3281     ha_rows examined_rows = 0;
3282     ha_rows found_rows = 0;
3283     ha_rows filesort_retval;
3284     SORT_FIELD *s_order= (SORT_FIELD *) my_malloc(sizeof(SORT_FIELD) *
3285         (largest_partition.elements + largest_order_by.elements) + 1,
3286         MYF(MY_WME | MY_ZEROFILL | MY_THREAD_SPECIFIC));
3287 
3288     size_t pos= 0;
3289     for (ORDER* curr = largest_partition.first; curr; curr=curr->next, pos++)
3290       s_order[pos].item = *curr->item;
3291 
3292     for (ORDER* curr = largest_order_by.first; curr; curr=curr->next, pos++)
3293       s_order[pos].item = *curr->item;
3294 
3295     table[0]->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
3296                                                MYF(MY_WME | MY_ZEROFILL|
3297                                                    MY_THREAD_SPECIFIC));
3298 
3299 
3300     filesort_retval= filesort(thd, table[0], s_order,
3301                               (largest_partition.elements + largest_order_by.elements),
3302                               this->select, HA_POS_ERROR, FALSE,
3303                               &examined_rows, &found_rows,
3304                               this->explain->ops_tracker.report_sorting(thd));
3305     table[0]->sort.found_records= filesort_retval;
3306 
3307     join_tab->read_first_record = join_init_read_record;
3308     join_tab->records= found_rows;
3309 
3310     my_free(s_order);
3311   }
3312   else
3313 #endif
3314