1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.catalyst.optimizer 19 20import org.apache.spark.sql.catalyst.analysis 21import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases 22import org.apache.spark.sql.catalyst.dsl.expressions._ 23import org.apache.spark.sql.catalyst.dsl.plans._ 24import org.apache.spark.sql.catalyst.expressions._ 25import org.apache.spark.sql.catalyst.plans._ 26import org.apache.spark.sql.catalyst.plans.logical._ 27import org.apache.spark.sql.catalyst.rules._ 28import org.apache.spark.sql.types.IntegerType 29 30class FilterPushdownSuite extends PlanTest { 31 32 object Optimize extends RuleExecutor[LogicalPlan] { 33 val batches = 34 Batch("Subqueries", Once, 35 EliminateSubqueryAliases) :: 36 Batch("Filter Pushdown", FixedPoint(10), 37 CombineFilters, 38 PushDownPredicate, 39 BooleanSimplification, 40 PushPredicateThroughJoin, 41 CollapseProject) :: Nil 42 } 43 44 val testRelation = LocalRelation('a.int, 'b.int, 'c.int) 45 46 val testRelation1 = LocalRelation('d.int) 47 48 // This test already passes. 49 test("eliminate subqueries") { 50 val originalQuery = 51 testRelation 52 .subquery('y) 53 .select('a) 54 55 val optimized = Optimize.execute(originalQuery.analyze) 56 val correctAnswer = 57 testRelation 58 .select('a.attr) 59 .analyze 60 61 comparePlans(optimized, correctAnswer) 62 } 63 64 // After this line is unimplemented. 65 test("simple push down") { 66 val originalQuery = 67 testRelation 68 .select('a) 69 .where('a === 1) 70 71 val optimized = Optimize.execute(originalQuery.analyze) 72 val correctAnswer = 73 testRelation 74 .where('a === 1) 75 .select('a) 76 .analyze 77 78 comparePlans(optimized, correctAnswer) 79 } 80 81 test("combine redundant filters") { 82 val originalQuery = 83 testRelation 84 .where('a === 1 && 'b === 1) 85 .where('a === 1 && 'c === 1) 86 87 val optimized = Optimize.execute(originalQuery.analyze) 88 val correctAnswer = 89 testRelation 90 .where('a === 1 && 'b === 1 && 'c === 1) 91 .analyze 92 93 comparePlans(optimized, correctAnswer) 94 } 95 96 test("SPARK-16164: Filter pushdown should keep the ordering in the logical plan") { 97 val originalQuery = 98 testRelation 99 .where('a === 1) 100 .select('a, 'b) 101 .where('b === 1) 102 103 val optimized = Optimize.execute(originalQuery.analyze) 104 val correctAnswer = 105 testRelation 106 .where('a === 1 && 'b === 1) 107 .select('a, 'b) 108 .analyze 109 110 // We can not use comparePlans here because it normalized the plan. 111 assert(optimized == correctAnswer) 112 } 113 114 test("SPARK-16994: filter should not be pushed through limit") { 115 val originalQuery = testRelation.limit(10).where('a === 1).analyze 116 val optimized = Optimize.execute(originalQuery) 117 comparePlans(optimized, originalQuery) 118 } 119 120 test("can't push without rewrite") { 121 val originalQuery = 122 testRelation 123 .select('a + 'b as 'e) 124 .where('e === 1) 125 .analyze 126 127 val optimized = Optimize.execute(originalQuery.analyze) 128 val correctAnswer = 129 testRelation 130 .where('a + 'b === 1) 131 .select('a + 'b as 'e) 132 .analyze 133 134 comparePlans(optimized, correctAnswer) 135 } 136 137 test("nondeterministic: can always push down filter through project with deterministic field") { 138 val originalQuery = testRelation 139 .select('a) 140 .where(Rand(10) > 5 || 'a > 5) 141 .analyze 142 143 val optimized = Optimize.execute(originalQuery) 144 145 val correctAnswer = testRelation 146 .where(Rand(10) > 5 || 'a > 5) 147 .select('a) 148 .analyze 149 150 comparePlans(optimized, correctAnswer) 151 } 152 153 test("nondeterministic: can't push down filter through project with nondeterministic field") { 154 val originalQuery = testRelation 155 .select(Rand(10).as('rand), 'a) 156 .where('a > 5) 157 .analyze 158 159 val optimized = Optimize.execute(originalQuery) 160 161 comparePlans(optimized, originalQuery) 162 } 163 164 test("nondeterministic: can't push down filter through aggregate with nondeterministic field") { 165 val originalQuery = testRelation 166 .groupBy('a)('a, Rand(10).as('rand)) 167 .where('a > 5) 168 .analyze 169 170 val optimized = Optimize.execute(originalQuery) 171 172 comparePlans(optimized, originalQuery) 173 } 174 175 test("nondeterministic: push down part of filter through aggregate with deterministic field") { 176 val originalQuery = testRelation 177 .groupBy('a)('a) 178 .where('a > 5 && Rand(10) > 5) 179 .analyze 180 181 val optimized = Optimize.execute(originalQuery.analyze) 182 183 val correctAnswer = testRelation 184 .where('a > 5) 185 .groupBy('a)('a) 186 .where(Rand(10) > 5) 187 .analyze 188 189 comparePlans(optimized, correctAnswer) 190 } 191 192 test("filters: combines filters") { 193 val originalQuery = testRelation 194 .select('a) 195 .where('a === 1) 196 .where('a === 2) 197 198 val optimized = Optimize.execute(originalQuery.analyze) 199 val correctAnswer = 200 testRelation 201 .where('a === 1 && 'a === 2) 202 .select('a).analyze 203 204 comparePlans(optimized, correctAnswer) 205 } 206 207 test("joins: push to either side") { 208 val x = testRelation.subquery('x) 209 val y = testRelation.subquery('y) 210 211 val originalQuery = { 212 x.join(y) 213 .where("x.b".attr === 1) 214 .where("y.b".attr === 2) 215 } 216 217 val optimized = Optimize.execute(originalQuery.analyze) 218 val left = testRelation.where('b === 1) 219 val right = testRelation.where('b === 2) 220 val correctAnswer = 221 left.join(right).analyze 222 223 comparePlans(optimized, correctAnswer) 224 } 225 226 test("joins: push to one side") { 227 val x = testRelation.subquery('x) 228 val y = testRelation.subquery('y) 229 230 val originalQuery = { 231 x.join(y) 232 .where("x.b".attr === 1) 233 } 234 235 val optimized = Optimize.execute(originalQuery.analyze) 236 val left = testRelation.where('b === 1) 237 val right = testRelation 238 val correctAnswer = 239 left.join(right).analyze 240 241 comparePlans(optimized, correctAnswer) 242 } 243 244 test("joins: push to one side after transformCondition") { 245 val x = testRelation.subquery('x) 246 val y = testRelation1.subquery('y) 247 248 val originalQuery = { 249 x.join(y) 250 .where(("x.a".attr === 1 && "y.d".attr === "x.b".attr) || 251 ("x.a".attr === 1 && "y.d".attr === "x.c".attr)) 252 } 253 254 val optimized = Optimize.execute(originalQuery.analyze) 255 val left = testRelation.where('a === 1) 256 val right = testRelation1 257 val correctAnswer = 258 left.join(right, condition = Some("d".attr === "b".attr || "d".attr === "c".attr)).analyze 259 260 comparePlans(optimized, correctAnswer) 261 } 262 263 test("joins: rewrite filter to push to either side") { 264 val x = testRelation.subquery('x) 265 val y = testRelation.subquery('y) 266 267 val originalQuery = { 268 x.join(y) 269 .where("x.b".attr === 1 && "y.b".attr === 2) 270 } 271 272 val optimized = Optimize.execute(originalQuery.analyze) 273 val left = testRelation.where('b === 1) 274 val right = testRelation.where('b === 2) 275 val correctAnswer = 276 left.join(right).analyze 277 278 comparePlans(optimized, correctAnswer) 279 } 280 281 test("joins: push down left semi join") { 282 val x = testRelation.subquery('x) 283 val y = testRelation1.subquery('y) 284 285 val originalQuery = { 286 x.join(y, LeftSemi, Option("x.a".attr === "y.d".attr && "x.b".attr >= 1 && "y.d".attr >= 2)) 287 } 288 289 val optimized = Optimize.execute(originalQuery.analyze) 290 val left = testRelation.where('b >= 1) 291 val right = testRelation1.where('d >= 2) 292 val correctAnswer = 293 left.join(right, LeftSemi, Option("a".attr === "d".attr)).analyze 294 295 comparePlans(optimized, correctAnswer) 296 } 297 298 test("joins: push down left outer join #1") { 299 val x = testRelation.subquery('x) 300 val y = testRelation.subquery('y) 301 302 val originalQuery = { 303 x.join(y, LeftOuter) 304 .where("x.b".attr === 1 && "y.b".attr === 2) 305 } 306 307 val optimized = Optimize.execute(originalQuery.analyze) 308 val left = testRelation.where('b === 1) 309 val correctAnswer = 310 left.join(y, LeftOuter).where("y.b".attr === 2).analyze 311 312 comparePlans(optimized, correctAnswer) 313 } 314 315 test("joins: push down right outer join #1") { 316 val x = testRelation.subquery('x) 317 val y = testRelation.subquery('y) 318 319 val originalQuery = { 320 x.join(y, RightOuter) 321 .where("x.b".attr === 1 && "y.b".attr === 2) 322 } 323 324 val optimized = Optimize.execute(originalQuery.analyze) 325 val right = testRelation.where('b === 2).subquery('d) 326 val correctAnswer = 327 x.join(right, RightOuter).where("x.b".attr === 1).analyze 328 329 comparePlans(optimized, correctAnswer) 330 } 331 332 test("joins: push down left outer join #2") { 333 val x = testRelation.subquery('x) 334 val y = testRelation.subquery('y) 335 336 val originalQuery = { 337 x.join(y, LeftOuter, Some("x.b".attr === 1)) 338 .where("x.b".attr === 2 && "y.b".attr === 2) 339 } 340 341 val optimized = Optimize.execute(originalQuery.analyze) 342 val left = testRelation.where('b === 2).subquery('d) 343 val correctAnswer = 344 left.join(y, LeftOuter, Some("d.b".attr === 1)).where("y.b".attr === 2).analyze 345 346 comparePlans(optimized, correctAnswer) 347 } 348 349 test("joins: push down right outer join #2") { 350 val x = testRelation.subquery('x) 351 val y = testRelation.subquery('y) 352 353 val originalQuery = { 354 x.join(y, RightOuter, Some("y.b".attr === 1)) 355 .where("x.b".attr === 2 && "y.b".attr === 2) 356 } 357 358 val optimized = Optimize.execute(originalQuery.analyze) 359 val right = testRelation.where('b === 2).subquery('d) 360 val correctAnswer = 361 x.join(right, RightOuter, Some("d.b".attr === 1)).where("x.b".attr === 2).analyze 362 363 comparePlans(optimized, correctAnswer) 364 } 365 366 test("joins: push down left outer join #3") { 367 val x = testRelation.subquery('x) 368 val y = testRelation.subquery('y) 369 370 val originalQuery = { 371 x.join(y, LeftOuter, Some("y.b".attr === 1)) 372 .where("x.b".attr === 2 && "y.b".attr === 2) 373 } 374 375 val optimized = Optimize.execute(originalQuery.analyze) 376 val left = testRelation.where('b === 2).subquery('l) 377 val right = testRelation.where('b === 1).subquery('r) 378 val correctAnswer = 379 left.join(right, LeftOuter).where("r.b".attr === 2).analyze 380 381 comparePlans(optimized, correctAnswer) 382 } 383 384 test("joins: push down right outer join #3") { 385 val x = testRelation.subquery('x) 386 val y = testRelation.subquery('y) 387 388 val originalQuery = { 389 x.join(y, RightOuter, Some("y.b".attr === 1)) 390 .where("x.b".attr === 2 && "y.b".attr === 2) 391 } 392 393 val optimized = Optimize.execute(originalQuery.analyze) 394 val right = testRelation.where('b === 2).subquery('r) 395 val correctAnswer = 396 x.join(right, RightOuter, Some("r.b".attr === 1)).where("x.b".attr === 2).analyze 397 398 comparePlans(optimized, correctAnswer) 399 } 400 401 test("joins: push down left outer join #4") { 402 val x = testRelation.subquery('x) 403 val y = testRelation.subquery('y) 404 405 val originalQuery = { 406 x.join(y, LeftOuter, Some("y.b".attr === 1)) 407 .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) 408 } 409 410 val optimized = Optimize.execute(originalQuery.analyze) 411 val left = testRelation.where('b === 2).subquery('l) 412 val right = testRelation.where('b === 1).subquery('r) 413 val correctAnswer = 414 left.join(right, LeftOuter).where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze 415 416 comparePlans(optimized, correctAnswer) 417 } 418 419 test("joins: push down right outer join #4") { 420 val x = testRelation.subquery('x) 421 val y = testRelation.subquery('y) 422 423 val originalQuery = { 424 x.join(y, RightOuter, Some("y.b".attr === 1)) 425 .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) 426 } 427 428 val optimized = Optimize.execute(originalQuery.analyze) 429 val left = testRelation.subquery('l) 430 val right = testRelation.where('b === 2).subquery('r) 431 val correctAnswer = 432 left.join(right, RightOuter, Some("r.b".attr === 1)). 433 where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze 434 435 comparePlans(optimized, correctAnswer) 436 } 437 438 test("joins: push down left outer join #5") { 439 val x = testRelation.subquery('x) 440 val y = testRelation.subquery('y) 441 442 val originalQuery = { 443 x.join(y, LeftOuter, Some("y.b".attr === 1 && "x.a".attr === 3)) 444 .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) 445 } 446 447 val optimized = Optimize.execute(originalQuery.analyze) 448 val left = testRelation.where('b === 2).subquery('l) 449 val right = testRelation.where('b === 1).subquery('r) 450 val correctAnswer = 451 left.join(right, LeftOuter, Some("l.a".attr===3)). 452 where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze 453 454 comparePlans(optimized, correctAnswer) 455 } 456 457 test("joins: push down right outer join #5") { 458 val x = testRelation.subquery('x) 459 val y = testRelation.subquery('y) 460 461 val originalQuery = { 462 x.join(y, RightOuter, Some("y.b".attr === 1 && "x.a".attr === 3)) 463 .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) 464 } 465 466 val optimized = Optimize.execute(originalQuery.analyze) 467 val left = testRelation.where('a === 3).subquery('l) 468 val right = testRelation.where('b === 2).subquery('r) 469 val correctAnswer = 470 left.join(right, RightOuter, Some("r.b".attr === 1)). 471 where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze 472 473 comparePlans(optimized, correctAnswer) 474 } 475 476 test("joins: can't push down") { 477 val x = testRelation.subquery('x) 478 val y = testRelation.subquery('y) 479 480 val originalQuery = { 481 x.join(y, condition = Some("x.b".attr === "y.b".attr)) 482 } 483 val optimized = Optimize.execute(originalQuery.analyze) 484 485 comparePlans(analysis.EliminateSubqueryAliases(originalQuery.analyze), optimized) 486 } 487 488 test("joins: conjunctive predicates") { 489 val x = testRelation.subquery('x) 490 val y = testRelation.subquery('y) 491 492 val originalQuery = { 493 x.join(y) 494 .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1) && ("y.a".attr === 1)) 495 } 496 497 val optimized = Optimize.execute(originalQuery.analyze) 498 val left = testRelation.where('a === 1).subquery('x) 499 val right = testRelation.where('a === 1).subquery('y) 500 val correctAnswer = 501 left.join(right, condition = Some("x.b".attr === "y.b".attr)) 502 .analyze 503 504 comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) 505 } 506 507 test("joins: conjunctive predicates #2") { 508 val x = testRelation.subquery('x) 509 val y = testRelation.subquery('y) 510 511 val originalQuery = { 512 x.join(y) 513 .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1)) 514 } 515 516 val optimized = Optimize.execute(originalQuery.analyze) 517 val left = testRelation.where('a === 1).subquery('x) 518 val right = testRelation.subquery('y) 519 val correctAnswer = 520 left.join(right, condition = Some("x.b".attr === "y.b".attr)) 521 .analyze 522 523 comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) 524 } 525 526 test("joins: conjunctive predicates #3") { 527 val x = testRelation.subquery('x) 528 val y = testRelation.subquery('y) 529 val z = testRelation.subquery('z) 530 531 val originalQuery = { 532 z.join(x.join(y)) 533 .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1) && 534 ("z.a".attr >= 3) && ("z.a".attr === "x.b".attr)) 535 } 536 537 val optimized = Optimize.execute(originalQuery.analyze) 538 val lleft = testRelation.where('a >= 3).subquery('z) 539 val left = testRelation.where('a === 1).subquery('x) 540 val right = testRelation.subquery('y) 541 val correctAnswer = 542 lleft.join( 543 left.join(right, condition = Some("x.b".attr === "y.b".attr)), 544 condition = Some("z.a".attr === "x.b".attr)) 545 .analyze 546 547 comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) 548 } 549 550 test("joins: push down where clause into left anti join") { 551 val x = testRelation.subquery('x) 552 val y = testRelation.subquery('y) 553 val originalQuery = 554 x.join(y, LeftAnti, Some("x.b".attr === "y.b".attr)) 555 .where("x.a".attr > 10) 556 .analyze 557 val optimized = Optimize.execute(originalQuery) 558 val correctAnswer = 559 x.where("x.a".attr > 10) 560 .join(y, LeftAnti, Some("x.b".attr === "y.b".attr)) 561 .analyze 562 comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) 563 } 564 565 test("joins: only push down join conditions to the right of a left anti join") { 566 val x = testRelation.subquery('x) 567 val y = testRelation.subquery('y) 568 val originalQuery = 569 x.join(y, 570 LeftAnti, 571 Some("x.b".attr === "y.b".attr && "y.a".attr > 10 && "x.a".attr > 10)).analyze 572 val optimized = Optimize.execute(originalQuery) 573 val correctAnswer = 574 x.join( 575 y.where("y.a".attr > 10), 576 LeftAnti, 577 Some("x.b".attr === "y.b".attr && "x.a".attr > 10)) 578 .analyze 579 comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) 580 } 581 582 test("joins: only push down join conditions to the right of an existence join") { 583 val x = testRelation.subquery('x) 584 val y = testRelation.subquery('y) 585 val fillerVal = 'val.boolean 586 val originalQuery = 587 x.join(y, 588 ExistenceJoin(fillerVal), 589 Some("x.a".attr > 1 && "y.b".attr > 2)).analyze 590 val optimized = Optimize.execute(originalQuery) 591 val correctAnswer = 592 x.join( 593 y.where("y.b".attr > 2), 594 ExistenceJoin(fillerVal), 595 Some("x.a".attr > 1)) 596 .analyze 597 comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) 598 } 599 600 val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) 601 602 test("generate: predicate referenced no generated column") { 603 val originalQuery = { 604 testRelationWithArrayType 605 .generate(Explode('c_arr), true, false, Some("arr")) 606 .where(('b >= 5) && ('a > 6)) 607 } 608 val optimized = Optimize.execute(originalQuery.analyze) 609 val correctAnswer = { 610 testRelationWithArrayType 611 .where(('b >= 5) && ('a > 6)) 612 .generate(Explode('c_arr), true, false, Some("arr")).analyze 613 } 614 615 comparePlans(optimized, correctAnswer) 616 } 617 618 test("generate: non-deterministic predicate referenced no generated column") { 619 val originalQuery = { 620 testRelationWithArrayType 621 .generate(Explode('c_arr), true, false, Some("arr")) 622 .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('c > 6)) 623 } 624 val optimized = Optimize.execute(originalQuery.analyze) 625 val correctAnswer = { 626 testRelationWithArrayType 627 .where('b >= 5) 628 .generate(Explode('c_arr), true, false, Some("arr")) 629 .where('a + Rand(10).as("rnd") > 6 && 'c > 6) 630 .analyze 631 } 632 633 comparePlans(optimized, correctAnswer) 634 } 635 636 test("generate: part of conjuncts referenced generated column") { 637 val generator = Explode('c_arr) 638 val originalQuery = { 639 testRelationWithArrayType 640 .generate(generator, true, false, Some("arr")) 641 .where(('b >= 5) && ('c > 6)) 642 } 643 val optimized = Optimize.execute(originalQuery.analyze) 644 val referenceResult = { 645 testRelationWithArrayType 646 .where('b >= 5) 647 .generate(generator, true, false, Some("arr")) 648 .where('c > 6).analyze 649 } 650 651 // Since newly generated columns get different ids every time being analyzed 652 // e.g. comparePlans(originalQuery.analyze, originalQuery.analyze) fails. 653 // So we check operators manually here. 654 // Filter("c" > 6) 655 assertResult(classOf[Filter])(optimized.getClass) 656 assertResult(1)(optimized.asInstanceOf[Filter].condition.references.size) 657 assertResult("c") { 658 optimized.asInstanceOf[Filter].condition.references.toSeq(0).name 659 } 660 661 // the rest part 662 comparePlans(optimized.children(0), referenceResult.children(0)) 663 } 664 665 test("generate: all conjuncts referenced generated column") { 666 val originalQuery = { 667 testRelationWithArrayType 668 .generate(Explode('c_arr), true, false, Some("arr")) 669 .where(('c > 6) || ('b > 5)).analyze 670 } 671 val optimized = Optimize.execute(originalQuery) 672 673 comparePlans(optimized, originalQuery) 674 } 675 676 test("aggregate: push down filter when filter on group by expression") { 677 val originalQuery = testRelation 678 .groupBy('a)('a, count('b) as 'c) 679 .select('a, 'c) 680 .where('a === 2) 681 682 val optimized = Optimize.execute(originalQuery.analyze) 683 684 val correctAnswer = testRelation 685 .where('a === 2) 686 .groupBy('a)('a, count('b) as 'c) 687 .analyze 688 comparePlans(optimized, correctAnswer) 689 } 690 691 test("aggregate: don't push down filter when filter not on group by expression") { 692 val originalQuery = testRelation 693 .select('a, 'b) 694 .groupBy('a)('a, count('b) as 'c) 695 .where('c === 2L) 696 697 val optimized = Optimize.execute(originalQuery.analyze) 698 699 comparePlans(optimized, originalQuery.analyze) 700 } 701 702 test("aggregate: push down filters partially which are subset of group by expressions") { 703 val originalQuery = testRelation 704 .select('a, 'b) 705 .groupBy('a)('a, count('b) as 'c) 706 .where('c === 2L && 'a === 3) 707 708 val optimized = Optimize.execute(originalQuery.analyze) 709 710 val correctAnswer = testRelation 711 .where('a === 3) 712 .select('a, 'b) 713 .groupBy('a)('a, count('b) as 'c) 714 .where('c === 2L) 715 .analyze 716 717 comparePlans(optimized, correctAnswer) 718 } 719 720 test("aggregate: push down filters with alias") { 721 val originalQuery = testRelation 722 .select('a, 'b) 723 .groupBy('a)(('a + 1) as 'aa, count('b) as 'c) 724 .where(('c === 2L || 'aa > 4) && 'aa < 3) 725 726 val optimized = Optimize.execute(originalQuery.analyze) 727 728 val correctAnswer = testRelation 729 .where('a + 1 < 3) 730 .select('a, 'b) 731 .groupBy('a)(('a + 1) as 'aa, count('b) as 'c) 732 .where('c === 2L || 'aa > 4) 733 .analyze 734 735 comparePlans(optimized, correctAnswer) 736 } 737 738 test("aggregate: push down filters with literal") { 739 val originalQuery = testRelation 740 .select('a, 'b) 741 .groupBy('a)('a, count('b) as 'c, "s" as 'd) 742 .where('c === 2L && 'd === "s") 743 744 val optimized = Optimize.execute(originalQuery.analyze) 745 746 val correctAnswer = testRelation 747 .where("s" === "s") 748 .select('a, 'b) 749 .groupBy('a)('a, count('b) as 'c, "s" as 'd) 750 .where('c === 2L) 751 .analyze 752 753 comparePlans(optimized, correctAnswer) 754 } 755 756 test("aggregate: don't push down filters that are nondeterministic") { 757 val originalQuery = testRelation 758 .select('a, 'b) 759 .groupBy('a)('a + Rand(10) as 'aa, count('b) as 'c, Rand(11).as("rnd")) 760 .where('c === 2L && 'aa + Rand(10).as("rnd") === 3 && 'rnd === 5) 761 762 val optimized = Optimize.execute(originalQuery.analyze) 763 764 val correctAnswer = testRelation 765 .select('a, 'b) 766 .groupBy('a)('a + Rand(10) as 'aa, count('b) as 'c, Rand(11).as("rnd")) 767 .where('c === 2L && 'aa + Rand(10).as("rnd") === 3 && 'rnd === 5) 768 .analyze 769 770 comparePlans(optimized, correctAnswer) 771 } 772 773 test("SPARK-17712: aggregate: don't push down filters that are data-independent") { 774 val originalQuery = LocalRelation.apply(testRelation.output, Seq.empty) 775 .select('a, 'b) 776 .groupBy('a)(count('a)) 777 .where(false) 778 779 val optimized = Optimize.execute(originalQuery.analyze) 780 781 val correctAnswer = testRelation 782 .select('a, 'b) 783 .groupBy('a)(count('a)) 784 .where(false) 785 .analyze 786 787 comparePlans(optimized, correctAnswer) 788 } 789 790 test("broadcast hint") { 791 val originalQuery = BroadcastHint(testRelation) 792 .where('a === 2L && 'b + Rand(10).as("rnd") === 3) 793 794 val optimized = Optimize.execute(originalQuery.analyze) 795 796 val correctAnswer = BroadcastHint(testRelation.where('a === 2L)) 797 .where('b + Rand(10).as("rnd") === 3) 798 .analyze 799 800 comparePlans(optimized, correctAnswer) 801 } 802 803 test("union") { 804 val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int) 805 806 val originalQuery = Union(Seq(testRelation, testRelation2)) 807 .where('a === 2L && 'b + Rand(10).as("rnd") === 3 && 'c > 5L) 808 809 val optimized = Optimize.execute(originalQuery.analyze) 810 811 val correctAnswer = Union(Seq( 812 testRelation.where('a === 2L), 813 testRelation2.where('d === 2L))) 814 .where('b + Rand(10).as("rnd") === 3 && 'c > 5L) 815 .analyze 816 817 comparePlans(optimized, correctAnswer) 818 } 819 820 test("expand") { 821 val agg = testRelation 822 .groupBy(Cube(Seq('a, 'b)))('a, 'b, sum('c)) 823 .analyze 824 .asInstanceOf[Aggregate] 825 826 val a = agg.output(0) 827 val b = agg.output(1) 828 829 val query = agg.where(a > 1 && b > 2) 830 val optimized = Optimize.execute(query) 831 val correctedAnswer = agg.copy(child = agg.child.where(a > 1 && b > 2)).analyze 832 comparePlans(optimized, correctedAnswer) 833 } 834 835 test("predicate subquery: push down simple") { 836 val x = testRelation.subquery('x) 837 val y = testRelation.subquery('y) 838 val z = LocalRelation('a.int, 'b.int, 'c.int).subquery('z) 839 840 val query = x 841 .join(y, Inner, Option("x.a".attr === "y.a".attr)) 842 .where(Exists(z.where("x.a".attr === "z.a".attr))) 843 .analyze 844 val answer = x 845 .where(Exists(z.where("x.a".attr === "z.a".attr))) 846 .join(y, Inner, Option("x.a".attr === "y.a".attr)) 847 .analyze 848 val optimized = Optimize.execute(Optimize.execute(query)) 849 comparePlans(optimized, answer) 850 } 851 852 test("predicate subquery: push down complex") { 853 val w = testRelation.subquery('w) 854 val x = testRelation.subquery('x) 855 val y = testRelation.subquery('y) 856 val z = LocalRelation('a.int, 'b.int, 'c.int).subquery('z) 857 858 val query = w 859 .join(x, Inner, Option("w.a".attr === "x.a".attr)) 860 .join(y, LeftOuter, Option("x.a".attr === "y.a".attr)) 861 .where(Exists(z.where("w.a".attr === "z.a".attr))) 862 .analyze 863 val answer = w 864 .where(Exists(z.where("w.a".attr === "z.a".attr))) 865 .join(x, Inner, Option("w.a".attr === "x.a".attr)) 866 .join(y, LeftOuter, Option("x.a".attr === "y.a".attr)) 867 .analyze 868 val optimized = Optimize.execute(Optimize.execute(query)) 869 comparePlans(optimized, answer) 870 } 871 872 test("Window: predicate push down -- basic") { 873 val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) 874 875 val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) 876 val correctAnswer = testRelation 877 .where('a > 1).select('a, 'b, 'c) 878 .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) 879 .select('a, 'b, 'c, 'window).analyze 880 881 comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) 882 } 883 884 test("Window: predicate push down -- predicates with compound predicate using only one column") { 885 val winExpr = 886 windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) 887 888 val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a * 3 > 15) 889 val correctAnswer = testRelation 890 .where('a * 3 > 15).select('a, 'b, 'c) 891 .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) 892 .select('a, 'b, 'c, 'window).analyze 893 894 comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) 895 } 896 897 test("Window: predicate push down -- multi window expressions with the same window spec") { 898 val winSpec = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) 899 val winExpr1 = windowExpr(count('b), winSpec) 900 val winExpr2 = windowExpr(sum('b), winSpec) 901 val originalQuery = testRelation 902 .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1) 903 904 val correctAnswer = testRelation 905 .where('a > 1).select('a, 'b, 'c) 906 .window(winExpr1.as('window1) :: winExpr2.as('window2) :: Nil, 907 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) 908 .select('a, 'b, 'c, 'window1, 'window2).analyze 909 910 comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) 911 } 912 913 test("Window: predicate push down -- multi window specification - 1") { 914 // order by clauses are different between winSpec1 and winSpec2 915 val winSpec1 = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) 916 val winExpr1 = windowExpr(count('b), winSpec1) 917 val winSpec2 = windowSpec('a.attr :: 'b.attr :: Nil, 'a.asc :: Nil, UnspecifiedFrame) 918 val winExpr2 = windowExpr(count('b), winSpec2) 919 val originalQuery = testRelation 920 .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1) 921 922 val correctAnswer1 = testRelation 923 .where('a > 1).select('a, 'b, 'c) 924 .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) 925 .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil) 926 .select('a, 'b, 'c, 'window1, 'window2).analyze 927 928 val correctAnswer2 = testRelation 929 .where('a > 1).select('a, 'b, 'c) 930 .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil) 931 .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) 932 .select('a, 'b, 'c, 'window1, 'window2).analyze 933 934 // When Analyzer adding Window operators after grouping the extracted Window Expressions 935 // based on their Partition and Order Specs, the order of Window operators is 936 // non-deterministic. Thus, we have two correct plans 937 val optimizedQuery = Optimize.execute(originalQuery.analyze) 938 try { 939 comparePlans(optimizedQuery, correctAnswer1) 940 } catch { 941 case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2) 942 } 943 } 944 945 test("Window: predicate push down -- multi window specification - 2") { 946 // partitioning clauses are different between winSpec1 and winSpec2 947 val winSpec1 = windowSpec('a.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) 948 val winExpr1 = windowExpr(count('b), winSpec1) 949 val winSpec2 = windowSpec('b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) 950 val winExpr2 = windowExpr(count('a), winSpec2) 951 val originalQuery = testRelation 952 .select('a, winExpr1.as('window1), 'b, 'c, winExpr2.as('window2)).where('b > 1) 953 954 val correctAnswer1 = testRelation.select('a, 'b, 'c) 955 .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil) 956 .where('b > 1) 957 .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil) 958 .select('a, 'window1, 'b, 'c, 'window2).analyze 959 960 val correctAnswer2 = testRelation.select('a, 'b, 'c) 961 .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil) 962 .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil) 963 .where('b > 1) 964 .select('a, 'window1, 'b, 'c, 'window2).analyze 965 966 val optimizedQuery = Optimize.execute(originalQuery.analyze) 967 // When Analyzer adding Window operators after grouping the extracted Window Expressions 968 // based on their Partition and Order Specs, the order of Window operators is 969 // non-deterministic. Thus, we have two correct plans 970 try { 971 comparePlans(optimizedQuery, correctAnswer1) 972 } catch { 973 case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2) 974 } 975 } 976 977 test("Window: predicate push down -- predicates with multiple partitioning columns") { 978 val winExpr = 979 windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) 980 981 val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) 982 val correctAnswer = testRelation 983 .where('a + 'b > 1).select('a, 'b, 'c) 984 .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) 985 .select('a, 'b, 'c, 'window).analyze 986 987 comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) 988 } 989 990 // complex predicates with the same references but the same expressions 991 // Todo: in Analyzer, to enable it, we need to convert the expression in conditions 992 // to the alias that is defined as the same expression 993 ignore("Window: predicate push down -- complex predicate with the same expressions") { 994 val winSpec = windowSpec( 995 partitionSpec = 'a.attr + 'b.attr :: Nil, 996 orderSpec = 'b.asc :: Nil, 997 UnspecifiedFrame) 998 val winExpr = windowExpr(count('b), winSpec) 999 1000 val winSpecAnalyzed = windowSpec( 1001 partitionSpec = '_w0.attr :: Nil, 1002 orderSpec = 'b.asc :: Nil, 1003 UnspecifiedFrame) 1004 val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) 1005 1006 val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) 1007 val correctAnswer = testRelation 1008 .where('a + 'b > 1).select('a, 'b, 'c, ('a + 'b).as("_w0")) 1009 .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil) 1010 .select('a, 'b, 'c, 'window).analyze 1011 1012 comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) 1013 } 1014 1015 test("Window: no predicate push down -- predicates are not from partitioning keys") { 1016 val winSpec = windowSpec( 1017 partitionSpec = 'a.attr :: 'b.attr :: Nil, 1018 orderSpec = 'b.asc :: Nil, 1019 UnspecifiedFrame) 1020 val winExpr = windowExpr(count('b), winSpec) 1021 1022 // No push down: the predicate is c > 1, but the partitioning key is (a, b). 1023 val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1) 1024 val correctAnswer = testRelation.select('a, 'b, 'c) 1025 .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) 1026 .where('c > 1).select('a, 'b, 'c, 'window).analyze 1027 1028 comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) 1029 } 1030 1031 test("Window: no predicate push down -- partial compound partition key") { 1032 val winSpec = windowSpec( 1033 partitionSpec = 'a.attr + 'b.attr :: 'b.attr :: Nil, 1034 orderSpec = 'b.asc :: Nil, 1035 UnspecifiedFrame) 1036 val winExpr = windowExpr(count('b), winSpec) 1037 1038 // No push down: the predicate is a > 1, but the partitioning key is (a + b, b) 1039 val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) 1040 1041 val winSpecAnalyzed = windowSpec( 1042 partitionSpec = '_w0.attr :: 'b.attr :: Nil, 1043 orderSpec = 'b.asc :: Nil, 1044 UnspecifiedFrame) 1045 val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) 1046 val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0")) 1047 .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: 'b.attr :: Nil, 'b.asc :: Nil) 1048 .where('a > 1).select('a, 'b, 'c, 'window).analyze 1049 1050 comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) 1051 } 1052 1053 test("Window: no predicate push down -- complex predicates containing non partitioning columns") { 1054 val winSpec = 1055 windowSpec(partitionSpec = 'b.attr :: Nil, orderSpec = 'b.asc :: Nil, UnspecifiedFrame) 1056 val winExpr = windowExpr(count('b), winSpec) 1057 1058 // No push down: the predicate is a + b > 1, but the partitioning key is b. 1059 val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) 1060 val correctAnswer = testRelation 1061 .select('a, 'b, 'c) 1062 .window(winExpr.as('window) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil) 1063 .where('a + 'b > 1).select('a, 'b, 'c, 'window).analyze 1064 1065 comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) 1066 } 1067 1068 // complex predicates with the same references but different expressions 1069 test("Window: no predicate push down -- complex predicate with different expressions") { 1070 val winSpec = windowSpec( 1071 partitionSpec = 'a.attr + 'b.attr :: Nil, 1072 orderSpec = 'b.asc :: Nil, 1073 UnspecifiedFrame) 1074 val winExpr = windowExpr(count('b), winSpec) 1075 1076 val winSpecAnalyzed = windowSpec( 1077 partitionSpec = '_w0.attr :: Nil, 1078 orderSpec = 'b.asc :: Nil, 1079 UnspecifiedFrame) 1080 val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) 1081 1082 // No push down: the predicate is a + b > 1, but the partitioning key is a + b. 1083 val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a - 'b > 1) 1084 val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0")) 1085 .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil) 1086 .where('a - 'b > 1).select('a, 'b, 'c, 'window).analyze 1087 1088 comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) 1089 } 1090 1091 test("join condition pushdown: deterministic and non-deterministic") { 1092 val x = testRelation.subquery('x) 1093 val y = testRelation.subquery('y) 1094 1095 // Verify that all conditions preceding the first non-deterministic condition are pushed down 1096 // by the optimizer and others are not. 1097 val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && "y.a".attr === 5 && 1098 "x.a".attr === Rand(10) && "y.b".attr === 5)) 1099 val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5), 1100 condition = Some("x.a".attr === Rand(10) && "y.b".attr === 5)) 1101 1102 comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze) 1103 } 1104} 1105