1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.catalyst.optimizer
19
20import org.apache.spark.sql.catalyst.analysis
21import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
22import org.apache.spark.sql.catalyst.dsl.expressions._
23import org.apache.spark.sql.catalyst.dsl.plans._
24import org.apache.spark.sql.catalyst.expressions._
25import org.apache.spark.sql.catalyst.plans._
26import org.apache.spark.sql.catalyst.plans.logical._
27import org.apache.spark.sql.catalyst.rules._
28import org.apache.spark.sql.types.IntegerType
29
30class FilterPushdownSuite extends PlanTest {
31
32  object Optimize extends RuleExecutor[LogicalPlan] {
33    val batches =
34      Batch("Subqueries", Once,
35        EliminateSubqueryAliases) ::
36      Batch("Filter Pushdown", FixedPoint(10),
37        CombineFilters,
38        PushDownPredicate,
39        BooleanSimplification,
40        PushPredicateThroughJoin,
41        CollapseProject) :: Nil
42  }
43
44  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
45
46  val testRelation1 = LocalRelation('d.int)
47
48  // This test already passes.
49  test("eliminate subqueries") {
50    val originalQuery =
51      testRelation
52        .subquery('y)
53        .select('a)
54
55    val optimized = Optimize.execute(originalQuery.analyze)
56    val correctAnswer =
57      testRelation
58        .select('a.attr)
59        .analyze
60
61    comparePlans(optimized, correctAnswer)
62  }
63
64  // After this line is unimplemented.
65  test("simple push down") {
66    val originalQuery =
67      testRelation
68        .select('a)
69        .where('a === 1)
70
71    val optimized = Optimize.execute(originalQuery.analyze)
72    val correctAnswer =
73      testRelation
74        .where('a === 1)
75        .select('a)
76        .analyze
77
78    comparePlans(optimized, correctAnswer)
79  }
80
81  test("combine redundant filters") {
82    val originalQuery =
83      testRelation
84        .where('a === 1 && 'b === 1)
85        .where('a === 1 && 'c === 1)
86
87    val optimized = Optimize.execute(originalQuery.analyze)
88    val correctAnswer =
89      testRelation
90        .where('a === 1 && 'b === 1 && 'c === 1)
91        .analyze
92
93    comparePlans(optimized, correctAnswer)
94  }
95
96  test("SPARK-16164: Filter pushdown should keep the ordering in the logical plan") {
97    val originalQuery =
98      testRelation
99        .where('a === 1)
100        .select('a, 'b)
101        .where('b === 1)
102
103    val optimized = Optimize.execute(originalQuery.analyze)
104    val correctAnswer =
105      testRelation
106        .where('a === 1 && 'b === 1)
107        .select('a, 'b)
108        .analyze
109
110    // We can not use comparePlans here because it normalized the plan.
111    assert(optimized == correctAnswer)
112  }
113
114  test("SPARK-16994: filter should not be pushed through limit") {
115    val originalQuery = testRelation.limit(10).where('a === 1).analyze
116    val optimized = Optimize.execute(originalQuery)
117    comparePlans(optimized, originalQuery)
118  }
119
120  test("can't push without rewrite") {
121    val originalQuery =
122      testRelation
123        .select('a + 'b as 'e)
124        .where('e === 1)
125        .analyze
126
127    val optimized = Optimize.execute(originalQuery.analyze)
128    val correctAnswer =
129      testRelation
130        .where('a + 'b === 1)
131        .select('a + 'b as 'e)
132        .analyze
133
134    comparePlans(optimized, correctAnswer)
135  }
136
137  test("nondeterministic: can always push down filter through project with deterministic field") {
138    val originalQuery = testRelation
139      .select('a)
140      .where(Rand(10) > 5 || 'a > 5)
141      .analyze
142
143    val optimized = Optimize.execute(originalQuery)
144
145    val correctAnswer = testRelation
146      .where(Rand(10) > 5 || 'a > 5)
147      .select('a)
148      .analyze
149
150    comparePlans(optimized, correctAnswer)
151  }
152
153  test("nondeterministic: can't push down filter through project with nondeterministic field") {
154    val originalQuery = testRelation
155      .select(Rand(10).as('rand), 'a)
156      .where('a > 5)
157      .analyze
158
159    val optimized = Optimize.execute(originalQuery)
160
161    comparePlans(optimized, originalQuery)
162  }
163
164  test("nondeterministic: can't push down filter through aggregate with nondeterministic field") {
165    val originalQuery = testRelation
166      .groupBy('a)('a, Rand(10).as('rand))
167      .where('a > 5)
168      .analyze
169
170    val optimized = Optimize.execute(originalQuery)
171
172    comparePlans(optimized, originalQuery)
173  }
174
175  test("nondeterministic: push down part of filter through aggregate with deterministic field") {
176    val originalQuery = testRelation
177      .groupBy('a)('a)
178      .where('a > 5 && Rand(10) > 5)
179      .analyze
180
181    val optimized = Optimize.execute(originalQuery.analyze)
182
183    val correctAnswer = testRelation
184      .where('a > 5)
185      .groupBy('a)('a)
186      .where(Rand(10) > 5)
187      .analyze
188
189    comparePlans(optimized, correctAnswer)
190  }
191
192  test("filters: combines filters") {
193    val originalQuery = testRelation
194      .select('a)
195      .where('a === 1)
196      .where('a === 2)
197
198    val optimized = Optimize.execute(originalQuery.analyze)
199    val correctAnswer =
200      testRelation
201        .where('a === 1 && 'a === 2)
202        .select('a).analyze
203
204    comparePlans(optimized, correctAnswer)
205  }
206
207  test("joins: push to either side") {
208    val x = testRelation.subquery('x)
209    val y = testRelation.subquery('y)
210
211    val originalQuery = {
212      x.join(y)
213        .where("x.b".attr === 1)
214        .where("y.b".attr === 2)
215    }
216
217    val optimized = Optimize.execute(originalQuery.analyze)
218    val left = testRelation.where('b === 1)
219    val right = testRelation.where('b === 2)
220    val correctAnswer =
221      left.join(right).analyze
222
223    comparePlans(optimized, correctAnswer)
224  }
225
226  test("joins: push to one side") {
227    val x = testRelation.subquery('x)
228    val y = testRelation.subquery('y)
229
230    val originalQuery = {
231      x.join(y)
232        .where("x.b".attr === 1)
233    }
234
235    val optimized = Optimize.execute(originalQuery.analyze)
236    val left = testRelation.where('b === 1)
237    val right = testRelation
238    val correctAnswer =
239      left.join(right).analyze
240
241    comparePlans(optimized, correctAnswer)
242  }
243
244  test("joins: push to one side after transformCondition") {
245    val x = testRelation.subquery('x)
246    val y = testRelation1.subquery('y)
247
248    val originalQuery = {
249      x.join(y)
250       .where(("x.a".attr === 1 && "y.d".attr === "x.b".attr) ||
251              ("x.a".attr === 1 && "y.d".attr === "x.c".attr))
252    }
253
254    val optimized = Optimize.execute(originalQuery.analyze)
255    val left = testRelation.where('a === 1)
256    val right = testRelation1
257    val correctAnswer =
258      left.join(right, condition = Some("d".attr === "b".attr || "d".attr === "c".attr)).analyze
259
260    comparePlans(optimized, correctAnswer)
261  }
262
263  test("joins: rewrite filter to push to either side") {
264    val x = testRelation.subquery('x)
265    val y = testRelation.subquery('y)
266
267    val originalQuery = {
268      x.join(y)
269        .where("x.b".attr === 1 && "y.b".attr === 2)
270    }
271
272    val optimized = Optimize.execute(originalQuery.analyze)
273    val left = testRelation.where('b === 1)
274    val right = testRelation.where('b === 2)
275    val correctAnswer =
276      left.join(right).analyze
277
278    comparePlans(optimized, correctAnswer)
279  }
280
281  test("joins: push down left semi join") {
282    val x = testRelation.subquery('x)
283    val y = testRelation1.subquery('y)
284
285    val originalQuery = {
286      x.join(y, LeftSemi, Option("x.a".attr === "y.d".attr && "x.b".attr >= 1 && "y.d".attr >= 2))
287    }
288
289    val optimized = Optimize.execute(originalQuery.analyze)
290    val left = testRelation.where('b >= 1)
291    val right = testRelation1.where('d >= 2)
292    val correctAnswer =
293      left.join(right, LeftSemi, Option("a".attr === "d".attr)).analyze
294
295    comparePlans(optimized, correctAnswer)
296  }
297
298  test("joins: push down left outer join #1") {
299    val x = testRelation.subquery('x)
300    val y = testRelation.subquery('y)
301
302    val originalQuery = {
303      x.join(y, LeftOuter)
304        .where("x.b".attr === 1 && "y.b".attr === 2)
305    }
306
307    val optimized = Optimize.execute(originalQuery.analyze)
308    val left = testRelation.where('b === 1)
309    val correctAnswer =
310      left.join(y, LeftOuter).where("y.b".attr === 2).analyze
311
312    comparePlans(optimized, correctAnswer)
313  }
314
315  test("joins: push down right outer join #1") {
316    val x = testRelation.subquery('x)
317    val y = testRelation.subquery('y)
318
319    val originalQuery = {
320      x.join(y, RightOuter)
321        .where("x.b".attr === 1 && "y.b".attr === 2)
322    }
323
324    val optimized = Optimize.execute(originalQuery.analyze)
325    val right = testRelation.where('b === 2).subquery('d)
326    val correctAnswer =
327      x.join(right, RightOuter).where("x.b".attr === 1).analyze
328
329    comparePlans(optimized, correctAnswer)
330  }
331
332  test("joins: push down left outer join #2") {
333    val x = testRelation.subquery('x)
334    val y = testRelation.subquery('y)
335
336    val originalQuery = {
337      x.join(y, LeftOuter, Some("x.b".attr === 1))
338        .where("x.b".attr === 2 && "y.b".attr === 2)
339    }
340
341    val optimized = Optimize.execute(originalQuery.analyze)
342    val left = testRelation.where('b === 2).subquery('d)
343    val correctAnswer =
344      left.join(y, LeftOuter, Some("d.b".attr === 1)).where("y.b".attr === 2).analyze
345
346    comparePlans(optimized, correctAnswer)
347  }
348
349  test("joins: push down right outer join #2") {
350    val x = testRelation.subquery('x)
351    val y = testRelation.subquery('y)
352
353    val originalQuery = {
354      x.join(y, RightOuter, Some("y.b".attr === 1))
355        .where("x.b".attr === 2 && "y.b".attr === 2)
356    }
357
358    val optimized = Optimize.execute(originalQuery.analyze)
359    val right = testRelation.where('b === 2).subquery('d)
360    val correctAnswer =
361      x.join(right, RightOuter, Some("d.b".attr === 1)).where("x.b".attr === 2).analyze
362
363    comparePlans(optimized, correctAnswer)
364  }
365
366  test("joins: push down left outer join #3") {
367    val x = testRelation.subquery('x)
368    val y = testRelation.subquery('y)
369
370    val originalQuery = {
371      x.join(y, LeftOuter, Some("y.b".attr === 1))
372        .where("x.b".attr === 2 && "y.b".attr === 2)
373    }
374
375    val optimized = Optimize.execute(originalQuery.analyze)
376    val left = testRelation.where('b === 2).subquery('l)
377    val right = testRelation.where('b === 1).subquery('r)
378    val correctAnswer =
379      left.join(right, LeftOuter).where("r.b".attr === 2).analyze
380
381    comparePlans(optimized, correctAnswer)
382  }
383
384  test("joins: push down right outer join #3") {
385    val x = testRelation.subquery('x)
386    val y = testRelation.subquery('y)
387
388    val originalQuery = {
389      x.join(y, RightOuter, Some("y.b".attr === 1))
390        .where("x.b".attr === 2 && "y.b".attr === 2)
391    }
392
393    val optimized = Optimize.execute(originalQuery.analyze)
394    val right = testRelation.where('b === 2).subquery('r)
395    val correctAnswer =
396      x.join(right, RightOuter, Some("r.b".attr === 1)).where("x.b".attr === 2).analyze
397
398    comparePlans(optimized, correctAnswer)
399  }
400
401  test("joins: push down left outer join #4") {
402    val x = testRelation.subquery('x)
403    val y = testRelation.subquery('y)
404
405    val originalQuery = {
406      x.join(y, LeftOuter, Some("y.b".attr === 1))
407        .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
408    }
409
410    val optimized = Optimize.execute(originalQuery.analyze)
411    val left = testRelation.where('b === 2).subquery('l)
412    val right = testRelation.where('b === 1).subquery('r)
413    val correctAnswer =
414      left.join(right, LeftOuter).where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
415
416    comparePlans(optimized, correctAnswer)
417  }
418
419  test("joins: push down right outer join #4") {
420    val x = testRelation.subquery('x)
421    val y = testRelation.subquery('y)
422
423    val originalQuery = {
424      x.join(y, RightOuter, Some("y.b".attr === 1))
425        .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
426    }
427
428    val optimized = Optimize.execute(originalQuery.analyze)
429    val left = testRelation.subquery('l)
430    val right = testRelation.where('b === 2).subquery('r)
431    val correctAnswer =
432      left.join(right, RightOuter, Some("r.b".attr === 1)).
433        where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
434
435    comparePlans(optimized, correctAnswer)
436  }
437
438  test("joins: push down left outer join #5") {
439    val x = testRelation.subquery('x)
440    val y = testRelation.subquery('y)
441
442    val originalQuery = {
443      x.join(y, LeftOuter, Some("y.b".attr === 1 && "x.a".attr === 3))
444        .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
445    }
446
447    val optimized = Optimize.execute(originalQuery.analyze)
448    val left = testRelation.where('b === 2).subquery('l)
449    val right = testRelation.where('b === 1).subquery('r)
450    val correctAnswer =
451      left.join(right, LeftOuter, Some("l.a".attr===3)).
452        where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
453
454    comparePlans(optimized, correctAnswer)
455  }
456
457  test("joins: push down right outer join #5") {
458    val x = testRelation.subquery('x)
459    val y = testRelation.subquery('y)
460
461    val originalQuery = {
462      x.join(y, RightOuter, Some("y.b".attr === 1 && "x.a".attr === 3))
463        .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
464    }
465
466    val optimized = Optimize.execute(originalQuery.analyze)
467    val left = testRelation.where('a === 3).subquery('l)
468    val right = testRelation.where('b === 2).subquery('r)
469    val correctAnswer =
470      left.join(right, RightOuter, Some("r.b".attr === 1)).
471        where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
472
473    comparePlans(optimized, correctAnswer)
474  }
475
476  test("joins: can't push down") {
477    val x = testRelation.subquery('x)
478    val y = testRelation.subquery('y)
479
480    val originalQuery = {
481      x.join(y, condition = Some("x.b".attr === "y.b".attr))
482    }
483    val optimized = Optimize.execute(originalQuery.analyze)
484
485    comparePlans(analysis.EliminateSubqueryAliases(originalQuery.analyze), optimized)
486  }
487
488  test("joins: conjunctive predicates") {
489    val x = testRelation.subquery('x)
490    val y = testRelation.subquery('y)
491
492    val originalQuery = {
493      x.join(y)
494        .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1) && ("y.a".attr === 1))
495    }
496
497    val optimized = Optimize.execute(originalQuery.analyze)
498    val left = testRelation.where('a === 1).subquery('x)
499    val right = testRelation.where('a === 1).subquery('y)
500    val correctAnswer =
501      left.join(right, condition = Some("x.b".attr === "y.b".attr))
502        .analyze
503
504    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
505  }
506
507  test("joins: conjunctive predicates #2") {
508    val x = testRelation.subquery('x)
509    val y = testRelation.subquery('y)
510
511    val originalQuery = {
512      x.join(y)
513        .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1))
514    }
515
516    val optimized = Optimize.execute(originalQuery.analyze)
517    val left = testRelation.where('a === 1).subquery('x)
518    val right = testRelation.subquery('y)
519    val correctAnswer =
520      left.join(right, condition = Some("x.b".attr === "y.b".attr))
521        .analyze
522
523    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
524  }
525
526  test("joins: conjunctive predicates #3") {
527    val x = testRelation.subquery('x)
528    val y = testRelation.subquery('y)
529    val z = testRelation.subquery('z)
530
531    val originalQuery = {
532      z.join(x.join(y))
533        .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1) &&
534          ("z.a".attr >= 3) && ("z.a".attr === "x.b".attr))
535    }
536
537    val optimized = Optimize.execute(originalQuery.analyze)
538    val lleft = testRelation.where('a >= 3).subquery('z)
539    val left = testRelation.where('a === 1).subquery('x)
540    val right = testRelation.subquery('y)
541    val correctAnswer =
542      lleft.join(
543        left.join(right, condition = Some("x.b".attr === "y.b".attr)),
544          condition = Some("z.a".attr === "x.b".attr))
545        .analyze
546
547    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
548  }
549
550  test("joins: push down where clause into left anti join") {
551    val x = testRelation.subquery('x)
552    val y = testRelation.subquery('y)
553    val originalQuery =
554      x.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
555        .where("x.a".attr > 10)
556        .analyze
557    val optimized = Optimize.execute(originalQuery)
558    val correctAnswer =
559      x.where("x.a".attr > 10)
560        .join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
561        .analyze
562    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
563  }
564
565  test("joins: only push down join conditions to the right of a left anti join") {
566    val x = testRelation.subquery('x)
567    val y = testRelation.subquery('y)
568    val originalQuery =
569      x.join(y,
570        LeftAnti,
571        Some("x.b".attr === "y.b".attr && "y.a".attr > 10 && "x.a".attr > 10)).analyze
572    val optimized = Optimize.execute(originalQuery)
573    val correctAnswer =
574      x.join(
575        y.where("y.a".attr > 10),
576        LeftAnti,
577        Some("x.b".attr === "y.b".attr && "x.a".attr > 10))
578        .analyze
579    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
580  }
581
582  test("joins: only push down join conditions to the right of an existence join") {
583    val x = testRelation.subquery('x)
584    val y = testRelation.subquery('y)
585    val fillerVal = 'val.boolean
586    val originalQuery =
587      x.join(y,
588        ExistenceJoin(fillerVal),
589        Some("x.a".attr > 1 && "y.b".attr > 2)).analyze
590    val optimized = Optimize.execute(originalQuery)
591    val correctAnswer =
592      x.join(
593        y.where("y.b".attr > 2),
594        ExistenceJoin(fillerVal),
595        Some("x.a".attr > 1))
596      .analyze
597    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
598  }
599
600  val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
601
602  test("generate: predicate referenced no generated column") {
603    val originalQuery = {
604      testRelationWithArrayType
605        .generate(Explode('c_arr), true, false, Some("arr"))
606        .where(('b >= 5) && ('a > 6))
607    }
608    val optimized = Optimize.execute(originalQuery.analyze)
609    val correctAnswer = {
610      testRelationWithArrayType
611        .where(('b >= 5) && ('a > 6))
612        .generate(Explode('c_arr), true, false, Some("arr")).analyze
613    }
614
615    comparePlans(optimized, correctAnswer)
616  }
617
618  test("generate: non-deterministic predicate referenced no generated column") {
619    val originalQuery = {
620      testRelationWithArrayType
621        .generate(Explode('c_arr), true, false, Some("arr"))
622        .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('c > 6))
623    }
624    val optimized = Optimize.execute(originalQuery.analyze)
625    val correctAnswer = {
626      testRelationWithArrayType
627        .where('b >= 5)
628        .generate(Explode('c_arr), true, false, Some("arr"))
629        .where('a + Rand(10).as("rnd") > 6 && 'c > 6)
630        .analyze
631    }
632
633    comparePlans(optimized, correctAnswer)
634  }
635
636  test("generate: part of conjuncts referenced generated column") {
637    val generator = Explode('c_arr)
638    val originalQuery = {
639      testRelationWithArrayType
640        .generate(generator, true, false, Some("arr"))
641        .where(('b >= 5) && ('c > 6))
642    }
643    val optimized = Optimize.execute(originalQuery.analyze)
644    val referenceResult = {
645      testRelationWithArrayType
646        .where('b >= 5)
647        .generate(generator, true, false, Some("arr"))
648        .where('c > 6).analyze
649    }
650
651    // Since newly generated columns get different ids every time being analyzed
652    // e.g. comparePlans(originalQuery.analyze, originalQuery.analyze) fails.
653    // So we check operators manually here.
654    // Filter("c" > 6)
655    assertResult(classOf[Filter])(optimized.getClass)
656    assertResult(1)(optimized.asInstanceOf[Filter].condition.references.size)
657    assertResult("c") {
658      optimized.asInstanceOf[Filter].condition.references.toSeq(0).name
659    }
660
661    // the rest part
662    comparePlans(optimized.children(0), referenceResult.children(0))
663  }
664
665  test("generate: all conjuncts referenced generated column") {
666    val originalQuery = {
667      testRelationWithArrayType
668        .generate(Explode('c_arr), true, false, Some("arr"))
669        .where(('c > 6) || ('b > 5)).analyze
670    }
671    val optimized = Optimize.execute(originalQuery)
672
673    comparePlans(optimized, originalQuery)
674  }
675
676  test("aggregate: push down filter when filter on group by expression") {
677    val originalQuery = testRelation
678                        .groupBy('a)('a, count('b) as 'c)
679                        .select('a, 'c)
680                        .where('a === 2)
681
682    val optimized = Optimize.execute(originalQuery.analyze)
683
684    val correctAnswer = testRelation
685                        .where('a === 2)
686                        .groupBy('a)('a, count('b) as 'c)
687                        .analyze
688    comparePlans(optimized, correctAnswer)
689  }
690
691  test("aggregate: don't push down filter when filter not on group by expression") {
692    val originalQuery = testRelation
693                        .select('a, 'b)
694                        .groupBy('a)('a, count('b) as 'c)
695                        .where('c === 2L)
696
697    val optimized = Optimize.execute(originalQuery.analyze)
698
699    comparePlans(optimized, originalQuery.analyze)
700  }
701
702  test("aggregate: push down filters partially which are subset of group by expressions") {
703    val originalQuery = testRelation
704                        .select('a, 'b)
705                        .groupBy('a)('a, count('b) as 'c)
706                        .where('c === 2L && 'a === 3)
707
708    val optimized = Optimize.execute(originalQuery.analyze)
709
710    val correctAnswer = testRelation
711                        .where('a === 3)
712                        .select('a, 'b)
713                        .groupBy('a)('a, count('b) as 'c)
714                        .where('c === 2L)
715                        .analyze
716
717    comparePlans(optimized, correctAnswer)
718  }
719
720  test("aggregate: push down filters with alias") {
721    val originalQuery = testRelation
722      .select('a, 'b)
723      .groupBy('a)(('a + 1) as 'aa, count('b) as 'c)
724      .where(('c === 2L || 'aa > 4) && 'aa < 3)
725
726    val optimized = Optimize.execute(originalQuery.analyze)
727
728    val correctAnswer = testRelation
729      .where('a + 1 < 3)
730      .select('a, 'b)
731      .groupBy('a)(('a + 1) as 'aa, count('b) as 'c)
732      .where('c === 2L || 'aa > 4)
733      .analyze
734
735    comparePlans(optimized, correctAnswer)
736  }
737
738  test("aggregate: push down filters with literal") {
739    val originalQuery = testRelation
740      .select('a, 'b)
741      .groupBy('a)('a, count('b) as 'c, "s" as 'd)
742      .where('c === 2L && 'd === "s")
743
744    val optimized = Optimize.execute(originalQuery.analyze)
745
746    val correctAnswer = testRelation
747      .where("s" === "s")
748      .select('a, 'b)
749      .groupBy('a)('a, count('b) as 'c, "s" as 'd)
750      .where('c === 2L)
751      .analyze
752
753    comparePlans(optimized, correctAnswer)
754  }
755
756  test("aggregate: don't push down filters that are nondeterministic") {
757    val originalQuery = testRelation
758      .select('a, 'b)
759      .groupBy('a)('a + Rand(10) as 'aa, count('b) as 'c, Rand(11).as("rnd"))
760      .where('c === 2L && 'aa + Rand(10).as("rnd") === 3 && 'rnd === 5)
761
762    val optimized = Optimize.execute(originalQuery.analyze)
763
764    val correctAnswer = testRelation
765      .select('a, 'b)
766      .groupBy('a)('a + Rand(10) as 'aa, count('b) as 'c, Rand(11).as("rnd"))
767      .where('c === 2L && 'aa + Rand(10).as("rnd") === 3 && 'rnd === 5)
768      .analyze
769
770    comparePlans(optimized, correctAnswer)
771  }
772
773  test("SPARK-17712: aggregate: don't push down filters that are data-independent") {
774    val originalQuery = LocalRelation.apply(testRelation.output, Seq.empty)
775      .select('a, 'b)
776      .groupBy('a)(count('a))
777      .where(false)
778
779    val optimized = Optimize.execute(originalQuery.analyze)
780
781    val correctAnswer = testRelation
782      .select('a, 'b)
783      .groupBy('a)(count('a))
784      .where(false)
785      .analyze
786
787    comparePlans(optimized, correctAnswer)
788  }
789
790  test("broadcast hint") {
791    val originalQuery = BroadcastHint(testRelation)
792      .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
793
794    val optimized = Optimize.execute(originalQuery.analyze)
795
796    val correctAnswer = BroadcastHint(testRelation.where('a === 2L))
797      .where('b + Rand(10).as("rnd") === 3)
798      .analyze
799
800    comparePlans(optimized, correctAnswer)
801  }
802
803  test("union") {
804    val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
805
806    val originalQuery = Union(Seq(testRelation, testRelation2))
807      .where('a === 2L && 'b + Rand(10).as("rnd") === 3 && 'c > 5L)
808
809    val optimized = Optimize.execute(originalQuery.analyze)
810
811    val correctAnswer = Union(Seq(
812      testRelation.where('a === 2L),
813      testRelation2.where('d === 2L)))
814      .where('b + Rand(10).as("rnd") === 3 && 'c > 5L)
815      .analyze
816
817    comparePlans(optimized, correctAnswer)
818  }
819
820  test("expand") {
821    val agg = testRelation
822      .groupBy(Cube(Seq('a, 'b)))('a, 'b, sum('c))
823      .analyze
824      .asInstanceOf[Aggregate]
825
826    val a = agg.output(0)
827    val b = agg.output(1)
828
829    val query = agg.where(a > 1 && b > 2)
830    val optimized = Optimize.execute(query)
831    val correctedAnswer = agg.copy(child = agg.child.where(a > 1 && b > 2)).analyze
832    comparePlans(optimized, correctedAnswer)
833  }
834
835  test("predicate subquery: push down simple") {
836    val x = testRelation.subquery('x)
837    val y = testRelation.subquery('y)
838    val z = LocalRelation('a.int, 'b.int, 'c.int).subquery('z)
839
840    val query = x
841      .join(y, Inner, Option("x.a".attr === "y.a".attr))
842      .where(Exists(z.where("x.a".attr === "z.a".attr)))
843      .analyze
844    val answer = x
845      .where(Exists(z.where("x.a".attr === "z.a".attr)))
846      .join(y, Inner, Option("x.a".attr === "y.a".attr))
847      .analyze
848    val optimized = Optimize.execute(Optimize.execute(query))
849    comparePlans(optimized, answer)
850  }
851
852  test("predicate subquery: push down complex") {
853    val w = testRelation.subquery('w)
854    val x = testRelation.subquery('x)
855    val y = testRelation.subquery('y)
856    val z = LocalRelation('a.int, 'b.int, 'c.int).subquery('z)
857
858    val query = w
859      .join(x, Inner, Option("w.a".attr === "x.a".attr))
860      .join(y, LeftOuter, Option("x.a".attr === "y.a".attr))
861      .where(Exists(z.where("w.a".attr === "z.a".attr)))
862      .analyze
863    val answer = w
864      .where(Exists(z.where("w.a".attr === "z.a".attr)))
865      .join(x, Inner, Option("w.a".attr === "x.a".attr))
866      .join(y, LeftOuter, Option("x.a".attr === "y.a".attr))
867      .analyze
868    val optimized = Optimize.execute(Optimize.execute(query))
869    comparePlans(optimized, answer)
870  }
871
872  test("Window: predicate push down -- basic") {
873    val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
874
875    val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1)
876    val correctAnswer = testRelation
877      .where('a > 1).select('a, 'b, 'c)
878      .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
879      .select('a, 'b, 'c, 'window).analyze
880
881    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
882  }
883
884  test("Window: predicate push down -- predicates with compound predicate using only one column") {
885    val winExpr =
886      windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
887
888    val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a * 3 > 15)
889    val correctAnswer = testRelation
890      .where('a * 3 > 15).select('a, 'b, 'c)
891      .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
892      .select('a, 'b, 'c, 'window).analyze
893
894    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
895  }
896
897  test("Window: predicate push down -- multi window expressions with the same window spec") {
898    val winSpec = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
899    val winExpr1 = windowExpr(count('b), winSpec)
900    val winExpr2 = windowExpr(sum('b), winSpec)
901    val originalQuery = testRelation
902      .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1)
903
904    val correctAnswer = testRelation
905      .where('a > 1).select('a, 'b, 'c)
906      .window(winExpr1.as('window1) :: winExpr2.as('window2) :: Nil,
907        'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
908      .select('a, 'b, 'c, 'window1, 'window2).analyze
909
910    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
911  }
912
913  test("Window: predicate push down -- multi window specification - 1") {
914    // order by clauses are different between winSpec1 and winSpec2
915    val winSpec1 = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
916    val winExpr1 = windowExpr(count('b), winSpec1)
917    val winSpec2 = windowSpec('a.attr :: 'b.attr :: Nil, 'a.asc :: Nil, UnspecifiedFrame)
918    val winExpr2 = windowExpr(count('b), winSpec2)
919    val originalQuery = testRelation
920      .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1)
921
922    val correctAnswer1 = testRelation
923      .where('a > 1).select('a, 'b, 'c)
924      .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
925      .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil)
926      .select('a, 'b, 'c, 'window1, 'window2).analyze
927
928    val correctAnswer2 = testRelation
929      .where('a > 1).select('a, 'b, 'c)
930      .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil)
931      .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
932      .select('a, 'b, 'c, 'window1, 'window2).analyze
933
934    // When Analyzer adding Window operators after grouping the extracted Window Expressions
935    // based on their Partition and Order Specs, the order of Window operators is
936    // non-deterministic. Thus, we have two correct plans
937    val optimizedQuery = Optimize.execute(originalQuery.analyze)
938    try {
939      comparePlans(optimizedQuery, correctAnswer1)
940    } catch {
941      case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2)
942    }
943  }
944
945  test("Window: predicate push down -- multi window specification - 2") {
946    // partitioning clauses are different between winSpec1 and winSpec2
947    val winSpec1 = windowSpec('a.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
948    val winExpr1 = windowExpr(count('b), winSpec1)
949    val winSpec2 = windowSpec('b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
950    val winExpr2 = windowExpr(count('a), winSpec2)
951    val originalQuery = testRelation
952      .select('a, winExpr1.as('window1), 'b, 'c, winExpr2.as('window2)).where('b > 1)
953
954    val correctAnswer1 = testRelation.select('a, 'b, 'c)
955      .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil)
956      .where('b > 1)
957      .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
958      .select('a, 'window1, 'b, 'c, 'window2).analyze
959
960    val correctAnswer2 = testRelation.select('a, 'b, 'c)
961      .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
962      .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil)
963      .where('b > 1)
964      .select('a, 'window1, 'b, 'c, 'window2).analyze
965
966    val optimizedQuery = Optimize.execute(originalQuery.analyze)
967    // When Analyzer adding Window operators after grouping the extracted Window Expressions
968    // based on their Partition and Order Specs, the order of Window operators is
969    // non-deterministic. Thus, we have two correct plans
970    try {
971      comparePlans(optimizedQuery, correctAnswer1)
972    } catch {
973      case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2)
974    }
975  }
976
977  test("Window: predicate push down -- predicates with multiple partitioning columns") {
978    val winExpr =
979      windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
980
981    val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
982    val correctAnswer = testRelation
983      .where('a + 'b > 1).select('a, 'b, 'c)
984      .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
985      .select('a, 'b, 'c, 'window).analyze
986
987    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
988  }
989
990  // complex predicates with the same references but the same expressions
991  // Todo: in Analyzer, to enable it, we need to convert the expression in conditions
992  // to the alias that is defined as the same expression
993  ignore("Window: predicate push down -- complex predicate with the same expressions") {
994    val winSpec = windowSpec(
995      partitionSpec = 'a.attr + 'b.attr :: Nil,
996      orderSpec = 'b.asc :: Nil,
997      UnspecifiedFrame)
998    val winExpr = windowExpr(count('b), winSpec)
999
1000    val winSpecAnalyzed = windowSpec(
1001      partitionSpec = '_w0.attr :: Nil,
1002      orderSpec = 'b.asc :: Nil,
1003      UnspecifiedFrame)
1004    val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
1005
1006    val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
1007    val correctAnswer = testRelation
1008      .where('a + 'b > 1).select('a, 'b, 'c, ('a + 'b).as("_w0"))
1009      .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil)
1010      .select('a, 'b, 'c, 'window).analyze
1011
1012    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
1013  }
1014
1015  test("Window: no predicate push down -- predicates are not from partitioning keys") {
1016    val winSpec = windowSpec(
1017      partitionSpec = 'a.attr :: 'b.attr :: Nil,
1018      orderSpec = 'b.asc :: Nil,
1019      UnspecifiedFrame)
1020    val winExpr = windowExpr(count('b), winSpec)
1021
1022    // No push down: the predicate is c > 1, but the partitioning key is (a, b).
1023    val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1)
1024    val correctAnswer = testRelation.select('a, 'b, 'c)
1025      .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
1026      .where('c > 1).select('a, 'b, 'c, 'window).analyze
1027
1028    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
1029  }
1030
1031  test("Window: no predicate push down -- partial compound partition key") {
1032    val winSpec = windowSpec(
1033      partitionSpec = 'a.attr + 'b.attr :: 'b.attr :: Nil,
1034      orderSpec = 'b.asc :: Nil,
1035      UnspecifiedFrame)
1036    val winExpr = windowExpr(count('b), winSpec)
1037
1038    // No push down: the predicate is a > 1, but the partitioning key is (a + b, b)
1039    val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1)
1040
1041    val winSpecAnalyzed = windowSpec(
1042      partitionSpec = '_w0.attr :: 'b.attr :: Nil,
1043      orderSpec = 'b.asc :: Nil,
1044      UnspecifiedFrame)
1045    val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
1046    val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0"))
1047      .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: 'b.attr :: Nil, 'b.asc :: Nil)
1048      .where('a > 1).select('a, 'b, 'c, 'window).analyze
1049
1050    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
1051  }
1052
1053  test("Window: no predicate push down -- complex predicates containing non partitioning columns") {
1054    val winSpec =
1055      windowSpec(partitionSpec = 'b.attr :: Nil, orderSpec = 'b.asc :: Nil, UnspecifiedFrame)
1056    val winExpr = windowExpr(count('b), winSpec)
1057
1058    // No push down: the predicate is a + b > 1, but the partitioning key is b.
1059    val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
1060    val correctAnswer = testRelation
1061      .select('a, 'b, 'c)
1062      .window(winExpr.as('window) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
1063      .where('a + 'b > 1).select('a, 'b, 'c, 'window).analyze
1064
1065    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
1066  }
1067
1068  // complex predicates with the same references but different expressions
1069  test("Window: no predicate push down -- complex predicate with different expressions") {
1070    val winSpec = windowSpec(
1071      partitionSpec = 'a.attr + 'b.attr :: Nil,
1072      orderSpec = 'b.asc :: Nil,
1073      UnspecifiedFrame)
1074    val winExpr = windowExpr(count('b), winSpec)
1075
1076    val winSpecAnalyzed = windowSpec(
1077      partitionSpec = '_w0.attr :: Nil,
1078      orderSpec = 'b.asc :: Nil,
1079      UnspecifiedFrame)
1080    val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
1081
1082    // No push down: the predicate is a + b > 1, but the partitioning key is a + b.
1083    val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a - 'b > 1)
1084    val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0"))
1085      .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil)
1086      .where('a - 'b > 1).select('a, 'b, 'c, 'window).analyze
1087
1088    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
1089  }
1090
1091  test("join condition pushdown: deterministic and non-deterministic") {
1092    val x = testRelation.subquery('x)
1093    val y = testRelation.subquery('y)
1094
1095    // Verify that all conditions preceding the first non-deterministic condition are pushed down
1096    // by the optimizer and others are not.
1097    val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && "y.a".attr === 5 &&
1098      "x.a".attr === Rand(10) && "y.b".attr === 5))
1099    val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5),
1100        condition = Some("x.a".attr === Rand(10) && "y.b".attr === 5))
1101
1102    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
1103  }
1104}
1105