1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <gmock/gmock-matchers.h>
19 
20 #include <functional>
21 #include <memory>
22 
23 #include "arrow/compute/exec.h"
24 #include "arrow/compute/exec/exec_plan.h"
25 #include "arrow/compute/exec/expression.h"
26 #include "arrow/compute/exec/options.h"
27 #include "arrow/compute/exec/test_util.h"
28 #include "arrow/compute/exec/util.h"
29 #include "arrow/io/util_internal.h"
30 #include "arrow/record_batch.h"
31 #include "arrow/table.h"
32 #include "arrow/testing/future_util.h"
33 #include "arrow/testing/gtest_util.h"
34 #include "arrow/testing/matchers.h"
35 #include "arrow/testing/random.h"
36 #include "arrow/util/async_generator.h"
37 #include "arrow/util/logging.h"
38 #include "arrow/util/make_unique.h"
39 #include "arrow/util/thread_pool.h"
40 #include "arrow/util/vector.h"
41 
42 using testing::ElementsAre;
43 using testing::ElementsAreArray;
44 using testing::HasSubstr;
45 using testing::Optional;
46 using testing::UnorderedElementsAreArray;
47 
48 namespace arrow {
49 
50 namespace compute {
51 
TEST(ExecPlanConstruction,Empty)52 TEST(ExecPlanConstruction, Empty) {
53   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
54 
55   ASSERT_THAT(plan->Validate(), Raises(StatusCode::Invalid));
56 }
57 
TEST(ExecPlanConstruction,SingleNode)58 TEST(ExecPlanConstruction, SingleNode) {
59   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
60   auto node = MakeDummyNode(plan.get(), "dummy", /*inputs=*/{}, /*num_outputs=*/0);
61   ASSERT_OK(plan->Validate());
62   ASSERT_THAT(plan->sources(), ElementsAre(node));
63   ASSERT_THAT(plan->sinks(), ElementsAre(node));
64 
65   ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
66   node = MakeDummyNode(plan.get(), "dummy", /*inputs=*/{}, /*num_outputs=*/1);
67   // Output not bound
68   ASSERT_THAT(plan->Validate(), Raises(StatusCode::Invalid));
69 }
70 
TEST(ExecPlanConstruction,SourceSink)71 TEST(ExecPlanConstruction, SourceSink) {
72   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
73   auto source = MakeDummyNode(plan.get(), "source", /*inputs=*/{}, /*num_outputs=*/1);
74   auto sink = MakeDummyNode(plan.get(), "sink", /*inputs=*/{source}, /*num_outputs=*/0);
75 
76   ASSERT_OK(plan->Validate());
77   EXPECT_THAT(plan->sources(), ElementsAre(source));
78   EXPECT_THAT(plan->sinks(), ElementsAre(sink));
79 }
80 
TEST(ExecPlanConstruction,MultipleNode)81 TEST(ExecPlanConstruction, MultipleNode) {
82   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
83 
84   auto source1 = MakeDummyNode(plan.get(), "source1", /*inputs=*/{}, /*num_outputs=*/2);
85 
86   auto source2 = MakeDummyNode(plan.get(), "source2", /*inputs=*/{}, /*num_outputs=*/1);
87 
88   auto process1 =
89       MakeDummyNode(plan.get(), "process1", /*inputs=*/{source1}, /*num_outputs=*/2);
90 
91   auto process2 = MakeDummyNode(plan.get(), "process1", /*inputs=*/{source1, source2},
92                                 /*num_outputs=*/1);
93 
94   auto process3 =
95       MakeDummyNode(plan.get(), "process3", /*inputs=*/{process1, process2, process1},
96                     /*num_outputs=*/1);
97 
98   auto sink = MakeDummyNode(plan.get(), "sink", /*inputs=*/{process3}, /*num_outputs=*/0);
99 
100   ASSERT_OK(plan->Validate());
101   ASSERT_THAT(plan->sources(), ElementsAre(source1, source2));
102   ASSERT_THAT(plan->sinks(), ElementsAre(sink));
103 }
104 
TEST(ExecPlanConstruction,AutoLabel)105 TEST(ExecPlanConstruction, AutoLabel) {
106   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
107   auto source1 = MakeDummyNode(plan.get(), "", /*inputs=*/{}, /*num_outputs=*/2);
108   auto source2 =
109       MakeDummyNode(plan.get(), "some_label", /*inputs=*/{}, /*num_outputs=*/1);
110   auto source3 = MakeDummyNode(plan.get(), "", /*inputs=*/{}, /*num_outputs=*/2);
111 
112   ASSERT_EQ("0", source1->label());
113   ASSERT_EQ("some_label", source2->label());
114   ASSERT_EQ("2", source3->label());
115 }
116 
117 struct StartStopTracker {
118   std::vector<std::string> started, stopped;
119 
start_producing_funcarrow::compute::StartStopTracker120   StartProducingFunc start_producing_func(Status st = Status::OK()) {
121     return [this, st](ExecNode* node) {
122       started.push_back(node->label());
123       return st;
124     };
125   }
126 
stop_producing_funcarrow::compute::StartStopTracker127   StopProducingFunc stop_producing_func() {
128     return [this](ExecNode* node) { stopped.push_back(node->label()); };
129   }
130 };
131 
TEST(ExecPlan,DummyStartProducing)132 TEST(ExecPlan, DummyStartProducing) {
133   StartStopTracker t;
134 
135   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
136 
137   auto source1 = MakeDummyNode(plan.get(), "source1", /*inputs=*/{}, /*num_outputs=*/2,
138                                t.start_producing_func(), t.stop_producing_func());
139 
140   auto source2 = MakeDummyNode(plan.get(), "source2", /*inputs=*/{}, /*num_outputs=*/1,
141                                t.start_producing_func(), t.stop_producing_func());
142 
143   auto process1 =
144       MakeDummyNode(plan.get(), "process1", /*inputs=*/{source1}, /*num_outputs=*/2,
145                     t.start_producing_func(), t.stop_producing_func());
146 
147   auto process2 =
148       MakeDummyNode(plan.get(), "process2", /*inputs=*/{process1, source2},
149                     /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
150 
151   auto process3 =
152       MakeDummyNode(plan.get(), "process3", /*inputs=*/{process1, source1, process2},
153                     /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
154 
155   MakeDummyNode(plan.get(), "sink", /*inputs=*/{process3}, /*num_outputs=*/0,
156                 t.start_producing_func(), t.stop_producing_func());
157 
158   ASSERT_OK(plan->Validate());
159   ASSERT_EQ(t.started.size(), 0);
160   ASSERT_EQ(t.stopped.size(), 0);
161 
162   ASSERT_OK(plan->StartProducing());
163   // Note that any correct reverse topological order may do
164   ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", "process1",
165                                      "source2", "source1"));
166 
167   plan->StopProducing();
168   ASSERT_THAT(plan->finished(), Finishes(Ok()));
169   // Note that any correct topological order may do
170   ASSERT_THAT(t.stopped, ElementsAre("source1", "source2", "process1", "process2",
171                                      "process3", "sink"));
172 
173   ASSERT_THAT(plan->StartProducing(),
174               Raises(StatusCode::Invalid, HasSubstr("restarted")));
175 }
176 
TEST(ExecPlan,DummyStartProducingError)177 TEST(ExecPlan, DummyStartProducingError) {
178   StartStopTracker t;
179 
180   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
181   auto source1 = MakeDummyNode(
182       plan.get(), "source1", /*num_inputs=*/{}, /*num_outputs=*/2,
183       t.start_producing_func(Status::NotImplemented("zzz")), t.stop_producing_func());
184 
185   auto source2 =
186       MakeDummyNode(plan.get(), "source2", /*num_inputs=*/{}, /*num_outputs=*/1,
187                     t.start_producing_func(), t.stop_producing_func());
188 
189   auto process1 = MakeDummyNode(
190       plan.get(), "process1", /*num_inputs=*/{source1}, /*num_outputs=*/2,
191       t.start_producing_func(Status::IOError("xxx")), t.stop_producing_func());
192 
193   auto process2 =
194       MakeDummyNode(plan.get(), "process2", /*num_inputs=*/{process1, source2},
195                     /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
196 
197   auto process3 =
198       MakeDummyNode(plan.get(), "process3", /*num_inputs=*/{process1, source1, process2},
199                     /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
200 
201   MakeDummyNode(plan.get(), "sink", /*num_inputs=*/{process3}, /*num_outputs=*/0,
202                 t.start_producing_func(), t.stop_producing_func());
203 
204   ASSERT_OK(plan->Validate());
205   ASSERT_EQ(t.started.size(), 0);
206   ASSERT_EQ(t.stopped.size(), 0);
207 
208   // `process1` raises IOError
209   ASSERT_THAT(plan->StartProducing(), Raises(StatusCode::IOError));
210   ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", "process1"));
211   // Nodes that started successfully were stopped in reverse order
212   ASSERT_THAT(t.stopped, ElementsAre("process2", "process3", "sink"));
213 }
214 
TEST(ExecPlanExecution,SourceSink)215 TEST(ExecPlanExecution, SourceSink) {
216   for (bool slow : {false, true}) {
217     SCOPED_TRACE(slow ? "slowed" : "unslowed");
218 
219     for (bool parallel : {false, true}) {
220       SCOPED_TRACE(parallel ? "parallel" : "single threaded");
221 
222       ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
223       AsyncGenerator<util::optional<ExecBatch>> sink_gen;
224 
225       auto basic_data = MakeBasicBatches();
226 
227       ASSERT_OK(Declaration::Sequence(
228                     {
229                         {"source", SourceNodeOptions{basic_data.schema,
230                                                      basic_data.gen(parallel, slow)}},
231                         {"sink", SinkNodeOptions{&sink_gen}},
232                     })
233                     .AddToPlan(plan.get()));
234 
235       ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
236                   Finishes(ResultWith(UnorderedElementsAreArray(basic_data.batches))));
237     }
238   }
239 }
240 
TEST(ExecPlanExecution,SinkNodeBackpressure)241 TEST(ExecPlanExecution, SinkNodeBackpressure) {
242   constexpr uint32_t kPauseIfAbove = 4;
243   constexpr uint32_t kResumeIfBelow = 2;
244   EXPECT_OK_AND_ASSIGN(std::shared_ptr<ExecPlan> plan, ExecPlan::Make());
245   PushGenerator<util::optional<ExecBatch>> batch_producer;
246   AsyncGenerator<util::optional<ExecBatch>> sink_gen;
247   util::BackpressureOptions backpressure_options =
248       util::BackpressureOptions::Make(kResumeIfBelow, kPauseIfAbove);
249   std::shared_ptr<Schema> schema_ = schema({field("data", uint32())});
250   ARROW_EXPECT_OK(compute::Declaration::Sequence(
251                       {
252                           {"source", SourceNodeOptions(schema_, batch_producer)},
253                           {"sink", SinkNodeOptions{&sink_gen, backpressure_options}},
254                       })
255                       .AddToPlan(plan.get()));
256   ARROW_EXPECT_OK(plan->StartProducing());
257 
258   EXPECT_OK_AND_ASSIGN(util::optional<ExecBatch> batch, ExecBatch::Make({MakeScalar(0)}));
259   ASSERT_TRUE(backpressure_options.toggle->IsOpen());
260 
261   // Should be able to push kPauseIfAbove batches without triggering back pressure
262   for (uint32_t i = 0; i < kPauseIfAbove; i++) {
263     batch_producer.producer().Push(batch);
264   }
265   SleepABit();
266   ASSERT_TRUE(backpressure_options.toggle->IsOpen());
267 
268   // One more batch should trigger back pressure
269   batch_producer.producer().Push(batch);
270   BusyWait(10, [&] { return !backpressure_options.toggle->IsOpen(); });
271   ASSERT_FALSE(backpressure_options.toggle->IsOpen());
272 
273   // Reading as much as we can while keeping it paused
274   for (uint32_t i = kPauseIfAbove; i >= kResumeIfBelow; i--) {
275     ASSERT_FINISHES_OK(sink_gen());
276   }
277   SleepABit();
278   ASSERT_FALSE(backpressure_options.toggle->IsOpen());
279 
280   // Reading one more item should open up backpressure
281   ASSERT_FINISHES_OK(sink_gen());
282   BusyWait(10, [&] { return backpressure_options.toggle->IsOpen(); });
283   ASSERT_TRUE(backpressure_options.toggle->IsOpen());
284 
285   // Cleanup
286   batch_producer.producer().Push(IterationEnd<util::optional<ExecBatch>>());
287   plan->StopProducing();
288   ASSERT_FINISHES_OK(plan->finished());
289 }
290 
TEST(ExecPlan,ToString)291 TEST(ExecPlan, ToString) {
292   auto basic_data = MakeBasicBatches();
293   AsyncGenerator<util::optional<ExecBatch>> sink_gen;
294 
295   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
296   ASSERT_OK(Declaration::Sequence(
297                 {
298                     {"source", SourceNodeOptions{basic_data.schema,
299                                                  basic_data.gen(/*parallel=*/false,
300                                                                 /*slow=*/false)}},
301                     {"sink", SinkNodeOptions{&sink_gen}},
302                 })
303                 .AddToPlan(plan.get()));
304   EXPECT_EQ(plan->sources()[0]->ToString(), R"(SourceNode{"source", outputs=["sink"]})");
305   EXPECT_EQ(plan->sinks()[0]->ToString(),
306             R"(SinkNode{"sink", inputs=[collected: "source"]})");
307   EXPECT_EQ(plan->ToString(), R"(ExecPlan with 2 nodes:
308 SourceNode{"source", outputs=["sink"]}
309 SinkNode{"sink", inputs=[collected: "source"]}
310 )");
311 
312   ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
313   CountOptions options(CountOptions::ONLY_VALID);
314   ASSERT_OK(
315       Declaration::Sequence(
316           {
317               {"source",
318                SourceNodeOptions{basic_data.schema,
319                                  basic_data.gen(/*parallel=*/false, /*slow=*/false)}},
320               {"filter", FilterNodeOptions{greater_equal(field_ref("i32"), literal(0))}},
321               {"project", ProjectNodeOptions{{
322                               field_ref("bool"),
323                               call("multiply", {field_ref("i32"), literal(2)}),
324                           }}},
325               {"aggregate",
326                AggregateNodeOptions{
327                    /*aggregates=*/{{"hash_sum", nullptr}, {"hash_count", &options}},
328                    /*targets=*/{"multiply(i32, 2)", "multiply(i32, 2)"},
329                    /*names=*/{"sum(multiply(i32, 2))", "count(multiply(i32, 2))"},
330                    /*keys=*/{"bool"}}},
331               {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"),
332                                                    literal(10))}},
333               {"order_by_sink",
334                OrderBySinkNodeOptions{
335                    SortOptions({SortKey{"sum(multiply(i32, 2))", SortOrder::Ascending}}),
336                    &sink_gen}},
337           })
338           .AddToPlan(plan.get()));
339   EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 6 nodes:
340 SourceNode{"source", outputs=["filter"]}
341 FilterNode{"filter", inputs=[target: "source"], outputs=["project"], filter=(i32 >= 0)}
342 ProjectNode{"project", inputs=[target: "filter"], outputs=["aggregate"], projection=[bool, multiply(i32, 2)]}
343 GroupByNode{"aggregate", inputs=[groupby: "project"], outputs=["filter"], keys=["bool"], aggregates=[
344 	hash_sum(multiply(i32, 2)),
345 	hash_count(multiply(i32, 2), {mode=NON_NULL}),
346 ]}
347 FilterNode{"filter", inputs=[target: "aggregate"], outputs=["order_by_sink"], filter=(sum(multiply(i32, 2)) > 10)}
348 OrderBySinkNode{"order_by_sink", inputs=[collected: "filter"], by={sort_keys=[sum(multiply(i32, 2)) ASC], null_placement=AtEnd}}
349 )a");
350 
351   ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
352   Declaration union_node{"union", ExecNodeOptions{}};
353   Declaration lhs{"source",
354                   SourceNodeOptions{basic_data.schema,
355                                     basic_data.gen(/*parallel=*/false, /*slow=*/false)}};
356   lhs.label = "lhs";
357   Declaration rhs{"source",
358                   SourceNodeOptions{basic_data.schema,
359                                     basic_data.gen(/*parallel=*/false, /*slow=*/false)}};
360   rhs.label = "rhs";
361   union_node.inputs.emplace_back(lhs);
362   union_node.inputs.emplace_back(rhs);
363   ASSERT_OK(
364       Declaration::Sequence(
365           {
366               union_node,
367               {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"count", &options}},
368                                                  /*targets=*/{"i32"},
369                                                  /*names=*/{"count(i32)"},
370                                                  /*keys=*/{}}},
371               {"sink", SinkNodeOptions{&sink_gen}},
372           })
373           .AddToPlan(plan.get()));
374   EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 5 nodes:
375 SourceNode{"lhs", outputs=["union"]}
376 SourceNode{"rhs", outputs=["union"]}
377 UnionNode{"union", inputs=[input_0_label: "lhs", input_1_label: "rhs"], outputs=["aggregate"]}
378 ScalarAggregateNode{"aggregate", inputs=[target: "union"], outputs=["sink"], aggregates=[
379 	count(i32, {mode=NON_NULL}),
380 ]}
381 SinkNode{"sink", inputs=[collected: "aggregate"]}
382 )a");
383 }
384 
TEST(ExecPlanExecution,SourceOrderBy)385 TEST(ExecPlanExecution, SourceOrderBy) {
386   std::vector<ExecBatch> expected = {
387       ExecBatchFromJSON({int32(), boolean()},
388                         "[[4, false], [5, null], [6, false], [7, false], [null, true]]")};
389   for (bool slow : {false, true}) {
390     SCOPED_TRACE(slow ? "slowed" : "unslowed");
391 
392     for (bool parallel : {false, true}) {
393       SCOPED_TRACE(parallel ? "parallel" : "single threaded");
394 
395       ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
396       AsyncGenerator<util::optional<ExecBatch>> sink_gen;
397 
398       auto basic_data = MakeBasicBatches();
399 
400       SortOptions options({SortKey("i32", SortOrder::Ascending)});
401       ASSERT_OK(Declaration::Sequence(
402                     {
403                         {"source", SourceNodeOptions{basic_data.schema,
404                                                      basic_data.gen(parallel, slow)}},
405                         {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
406                     })
407                     .AddToPlan(plan.get()));
408 
409       ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
410                   Finishes(ResultWith(ElementsAreArray(expected))));
411     }
412   }
413 }
414 
TEST(ExecPlanExecution,SourceSinkError)415 TEST(ExecPlanExecution, SourceSinkError) {
416   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
417   AsyncGenerator<util::optional<ExecBatch>> sink_gen;
418 
419   auto basic_data = MakeBasicBatches();
420   auto it = basic_data.batches.begin();
421   AsyncGenerator<util::optional<ExecBatch>> error_source_gen =
422       [&]() -> Result<util::optional<ExecBatch>> {
423     if (it == basic_data.batches.end()) {
424       return Status::Invalid("Artificial error");
425     }
426     return util::make_optional(*it++);
427   };
428 
429   ASSERT_OK(Declaration::Sequence(
430                 {
431                     {"source", SourceNodeOptions{basic_data.schema, error_source_gen}},
432                     {"sink", SinkNodeOptions{&sink_gen}},
433                 })
434                 .AddToPlan(plan.get()));
435 
436   ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
437               Finishes(Raises(StatusCode::Invalid, HasSubstr("Artificial"))));
438 }
439 
TEST(ExecPlanExecution,SourceConsumingSink)440 TEST(ExecPlanExecution, SourceConsumingSink) {
441   for (bool slow : {false, true}) {
442     SCOPED_TRACE(slow ? "slowed" : "unslowed");
443 
444     for (bool parallel : {false, true}) {
445       SCOPED_TRACE(parallel ? "parallel" : "single threaded");
446       ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
447       std::atomic<uint32_t> batches_seen{0};
448       Future<> finish = Future<>::Make();
449       struct TestConsumer : public SinkNodeConsumer {
450         TestConsumer(std::atomic<uint32_t>* batches_seen, Future<> finish)
451             : batches_seen(batches_seen), finish(std::move(finish)) {}
452 
453         Status Consume(ExecBatch batch) override {
454           (*batches_seen)++;
455           return Status::OK();
456         }
457 
458         Future<> Finish() override { return finish; }
459 
460         std::atomic<uint32_t>* batches_seen;
461         Future<> finish;
462       };
463       std::shared_ptr<TestConsumer> consumer =
464           std::make_shared<TestConsumer>(&batches_seen, finish);
465 
466       auto basic_data = MakeBasicBatches();
467       ASSERT_OK_AND_ASSIGN(
468           auto source, MakeExecNode("source", plan.get(), {},
469                                     SourceNodeOptions(basic_data.schema,
470                                                       basic_data.gen(parallel, slow))));
471       ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
472                              ConsumingSinkNodeOptions(consumer)));
473       ASSERT_OK(plan->StartProducing());
474       // Source should finish fairly quickly
475       ASSERT_FINISHES_OK(source->finished());
476       SleepABit();
477       ASSERT_EQ(2, batches_seen);
478       // Consumer isn't finished and so plan shouldn't have finished
479       AssertNotFinished(plan->finished());
480       // Mark consumption complete, plan should finish
481       finish.MarkFinished();
482       ASSERT_FINISHES_OK(plan->finished());
483     }
484   }
485 }
486 
TEST(ExecPlanExecution,ConsumingSinkError)487 TEST(ExecPlanExecution, ConsumingSinkError) {
488   struct ConsumeErrorConsumer : public SinkNodeConsumer {
489     Status Consume(ExecBatch batch) override { return Status::Invalid("XYZ"); }
490     Future<> Finish() override { return Future<>::MakeFinished(); }
491   };
492   struct FinishErrorConsumer : public SinkNodeConsumer {
493     Status Consume(ExecBatch batch) override { return Status::OK(); }
494     Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); }
495   };
496   std::vector<std::shared_ptr<SinkNodeConsumer>> consumers{
497       std::make_shared<ConsumeErrorConsumer>(), std::make_shared<FinishErrorConsumer>()};
498 
499   for (auto& consumer : consumers) {
500     ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
501     auto basic_data = MakeBasicBatches();
502     ASSERT_OK(Declaration::Sequence(
503                   {{"source",
504                     SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))},
505                    {"consuming_sink", ConsumingSinkNodeOptions(consumer)}})
506                   .AddToPlan(plan.get()));
507     ASSERT_OK_AND_ASSIGN(
508         auto source,
509         MakeExecNode("source", plan.get(), {},
510                      SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
511     ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
512                            ConsumingSinkNodeOptions(consumer)));
513     ASSERT_OK(plan->StartProducing());
514     ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
515   }
516 }
517 
TEST(ExecPlanExecution,ConsumingSinkErrorFinish)518 TEST(ExecPlanExecution, ConsumingSinkErrorFinish) {
519   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
520   struct FinishErrorConsumer : public SinkNodeConsumer {
521     Status Consume(ExecBatch batch) override { return Status::OK(); }
522     Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); }
523   };
524   std::shared_ptr<FinishErrorConsumer> consumer = std::make_shared<FinishErrorConsumer>();
525 
526   auto basic_data = MakeBasicBatches();
527   ASSERT_OK(
528       Declaration::Sequence(
529           {{"source", SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))},
530            {"consuming_sink", ConsumingSinkNodeOptions(consumer)}})
531           .AddToPlan(plan.get()));
532   ASSERT_OK_AND_ASSIGN(
533       auto source,
534       MakeExecNode("source", plan.get(), {},
535                    SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
536   ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
537                          ConsumingSinkNodeOptions(consumer)));
538   ASSERT_OK(plan->StartProducing());
539   ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
540 }
541 
TEST(ExecPlanExecution,StressSourceSink)542 TEST(ExecPlanExecution, StressSourceSink) {
543   for (bool slow : {false, true}) {
544     SCOPED_TRACE(slow ? "slowed" : "unslowed");
545 
546     for (bool parallel : {false, true}) {
547       SCOPED_TRACE(parallel ? "parallel" : "single threaded");
548 
549       int num_batches = (slow && !parallel) ? 30 : 300;
550 
551       ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
552       AsyncGenerator<util::optional<ExecBatch>> sink_gen;
553 
554       auto random_data = MakeRandomBatches(
555           schema({field("a", int32()), field("b", boolean())}), num_batches);
556 
557       ASSERT_OK(Declaration::Sequence(
558                     {
559                         {"source", SourceNodeOptions{random_data.schema,
560                                                      random_data.gen(parallel, slow)}},
561                         {"sink", SinkNodeOptions{&sink_gen}},
562                     })
563                     .AddToPlan(plan.get()));
564 
565       ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
566                   Finishes(ResultWith(UnorderedElementsAreArray(random_data.batches))));
567     }
568   }
569 }
570 
TEST(ExecPlanExecution,StressSourceOrderBy)571 TEST(ExecPlanExecution, StressSourceOrderBy) {
572   auto input_schema = schema({field("a", int32()), field("b", boolean())});
573   for (bool slow : {false, true}) {
574     SCOPED_TRACE(slow ? "slowed" : "unslowed");
575 
576     for (bool parallel : {false, true}) {
577       SCOPED_TRACE(parallel ? "parallel" : "single threaded");
578 
579       int num_batches = (slow && !parallel) ? 30 : 300;
580 
581       ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
582       AsyncGenerator<util::optional<ExecBatch>> sink_gen;
583 
584       auto random_data = MakeRandomBatches(input_schema, num_batches);
585 
586       SortOptions options({SortKey("a", SortOrder::Ascending)});
587       ASSERT_OK(Declaration::Sequence(
588                     {
589                         {"source", SourceNodeOptions{random_data.schema,
590                                                      random_data.gen(parallel, slow)}},
591                         {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
592                     })
593                     .AddToPlan(plan.get()));
594 
595       // Check that data is sorted appropriately
596       ASSERT_FINISHES_OK_AND_ASSIGN(auto exec_batches,
597                                     StartAndCollect(plan.get(), sink_gen));
598       ASSERT_OK_AND_ASSIGN(auto actual, TableFromExecBatches(input_schema, exec_batches));
599       ASSERT_OK_AND_ASSIGN(auto original,
600                            TableFromExecBatches(input_schema, random_data.batches));
601       ASSERT_OK_AND_ASSIGN(auto sort_indices, SortIndices(original, options));
602       ASSERT_OK_AND_ASSIGN(auto expected, Take(original, sort_indices));
603       AssertTablesEqual(*actual, *expected.table());
604     }
605   }
606 }
607 
TEST(ExecPlanExecution,StressSourceGroupedSumStop)608 TEST(ExecPlanExecution, StressSourceGroupedSumStop) {
609   auto input_schema = schema({field("a", int32()), field("b", boolean())});
610   for (bool slow : {false, true}) {
611     SCOPED_TRACE(slow ? "slowed" : "unslowed");
612 
613     for (bool parallel : {false, true}) {
614       SCOPED_TRACE(parallel ? "parallel" : "single threaded");
615 
616       int num_batches = (slow && !parallel) ? 30 : 300;
617 
618       ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
619       AsyncGenerator<util::optional<ExecBatch>> sink_gen;
620 
621       auto random_data = MakeRandomBatches(input_schema, num_batches);
622 
623       SortOptions options({SortKey("a", SortOrder::Ascending)});
624       ASSERT_OK(Declaration::Sequence(
625                     {
626                         {"source", SourceNodeOptions{random_data.schema,
627                                                      random_data.gen(parallel, slow)}},
628                         {"aggregate",
629                          AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
630                                               /*targets=*/{"a"}, /*names=*/{"sum(a)"},
631                                               /*keys=*/{"b"}}},
632                         {"sink", SinkNodeOptions{&sink_gen}},
633                     })
634                     .AddToPlan(plan.get()));
635 
636       ASSERT_OK(plan->Validate());
637       ASSERT_OK(plan->StartProducing());
638       plan->StopProducing();
639       ASSERT_FINISHES_OK(plan->finished());
640     }
641   }
642 }
643 
TEST(ExecPlanExecution,StressSourceSinkStopped)644 TEST(ExecPlanExecution, StressSourceSinkStopped) {
645   for (bool slow : {false, true}) {
646     SCOPED_TRACE(slow ? "slowed" : "unslowed");
647 
648     for (bool parallel : {false, true}) {
649       SCOPED_TRACE(parallel ? "parallel" : "single threaded");
650 
651       int num_batches = (slow && !parallel) ? 30 : 300;
652 
653       ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
654       AsyncGenerator<util::optional<ExecBatch>> sink_gen;
655 
656       auto random_data = MakeRandomBatches(
657           schema({field("a", int32()), field("b", boolean())}), num_batches);
658 
659       ASSERT_OK(Declaration::Sequence(
660                     {
661                         {"source", SourceNodeOptions{random_data.schema,
662                                                      random_data.gen(parallel, slow)}},
663                         {"sink", SinkNodeOptions{&sink_gen}},
664                     })
665                     .AddToPlan(plan.get()));
666 
667       ASSERT_OK(plan->Validate());
668       ASSERT_OK(plan->StartProducing());
669 
670       EXPECT_THAT(sink_gen(), Finishes(ResultWith(Optional(random_data.batches[0]))));
671 
672       plan->StopProducing();
673       ASSERT_THAT(plan->finished(), Finishes(Ok()));
674     }
675   }
676 }
677 
TEST(ExecPlanExecution,SourceFilterSink)678 TEST(ExecPlanExecution, SourceFilterSink) {
679   auto basic_data = MakeBasicBatches();
680 
681   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
682   AsyncGenerator<util::optional<ExecBatch>> sink_gen;
683 
684   ASSERT_OK(Declaration::Sequence(
685                 {
686                     {"source", SourceNodeOptions{basic_data.schema,
687                                                  basic_data.gen(/*parallel=*/false,
688                                                                 /*slow=*/false)}},
689                     {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}},
690                     {"sink", SinkNodeOptions{&sink_gen}},
691                 })
692                 .AddToPlan(plan.get()));
693 
694   ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
695               Finishes(ResultWith(UnorderedElementsAreArray(
696                   {ExecBatchFromJSON({int32(), boolean()}, "[]"),
697                    ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")}))));
698 }
699 
TEST(ExecPlanExecution,SourceProjectSink)700 TEST(ExecPlanExecution, SourceProjectSink) {
701   auto basic_data = MakeBasicBatches();
702 
703   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
704   AsyncGenerator<util::optional<ExecBatch>> sink_gen;
705 
706   ASSERT_OK(Declaration::Sequence(
707                 {
708                     {"source", SourceNodeOptions{basic_data.schema,
709                                                  basic_data.gen(/*parallel=*/false,
710                                                                 /*slow=*/false)}},
711                     {"project",
712                      ProjectNodeOptions{{
713                                             not_(field_ref("bool")),
714                                             call("add", {field_ref("i32"), literal(1)}),
715                                         },
716                                         {"!bool", "i32 + 1"}}},
717                     {"sink", SinkNodeOptions{&sink_gen}},
718                 })
719                 .AddToPlan(plan.get()));
720 
721   ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
722               Finishes(ResultWith(UnorderedElementsAreArray(
723                   {ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"),
724                    ExecBatchFromJSON({boolean(), int32()},
725                                      "[[null, 6], [true, 7], [true, 8]]")}))));
726 }
727 
728 namespace {
729 
MakeGroupableBatches(int multiplicity=1)730 BatchesWithSchema MakeGroupableBatches(int multiplicity = 1) {
731   BatchesWithSchema out;
732 
733   out.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([
734                    [12, "alfa"],
735                    [7,  "beta"],
736                    [3,  "alfa"]
737                  ])"),
738                  ExecBatchFromJSON({int32(), utf8()}, R"([
739                    [-2, "alfa"],
740                    [-1, "gama"],
741                    [3,  "alfa"]
742                  ])"),
743                  ExecBatchFromJSON({int32(), utf8()}, R"([
744                    [5,  "gama"],
745                    [3,  "beta"],
746                    [-8, "alfa"]
747                  ])")};
748 
749   size_t batch_count = out.batches.size();
750   for (int repeat = 1; repeat < multiplicity; ++repeat) {
751     for (size_t i = 0; i < batch_count; ++i) {
752       out.batches.push_back(out.batches[i]);
753     }
754   }
755 
756   out.schema = schema({field("i32", int32()), field("str", utf8())});
757 
758   return out;
759 }
760 
761 }  // namespace
762 
TEST(ExecPlanExecution,SourceGroupedSum)763 TEST(ExecPlanExecution, SourceGroupedSum) {
764   for (bool parallel : {false, true}) {
765     SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
766 
767     auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1);
768 
769     ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
770     AsyncGenerator<util::optional<ExecBatch>> sink_gen;
771 
772     ASSERT_OK(Declaration::Sequence(
773                   {
774                       {"source", SourceNodeOptions{input.schema,
775                                                    input.gen(parallel, /*slow=*/false)}},
776                       {"aggregate",
777                        AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
778                                             /*targets=*/{"i32"}, /*names=*/{"sum(i32)"},
779                                             /*keys=*/{"str"}}},
780                       {"sink", SinkNodeOptions{&sink_gen}},
781                   })
782                   .AddToPlan(plan.get()));
783 
784     ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
785                 Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON(
786                     {int64(), utf8()},
787                     parallel ? R"([[800, "alfa"], [1000, "beta"], [400, "gama"]])"
788                              : R"([[8, "alfa"], [10, "beta"], [4, "gama"]])")}))));
789   }
790 }
791 
TEST(ExecPlanExecution,SourceFilterProjectGroupedSumFilter)792 TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) {
793   for (bool parallel : {false, true}) {
794     SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
795 
796     int batch_multiplicity = parallel ? 100 : 1;
797     auto input = MakeGroupableBatches(/*multiplicity=*/batch_multiplicity);
798 
799     ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
800     AsyncGenerator<util::optional<ExecBatch>> sink_gen;
801 
802     ASSERT_OK(
803         Declaration::Sequence(
804             {
805                 {"source",
806                  SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
807                 {"filter",
808                  FilterNodeOptions{greater_equal(field_ref("i32"), literal(0))}},
809                 {"project", ProjectNodeOptions{{
810                                 field_ref("str"),
811                                 call("multiply", {field_ref("i32"), literal(2)}),
812                             }}},
813                 {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
814                                                    /*targets=*/{"multiply(i32, 2)"},
815                                                    /*names=*/{"sum(multiply(i32, 2))"},
816                                                    /*keys=*/{"str"}}},
817                 {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"),
818                                                      literal(10 * batch_multiplicity))}},
819                 {"sink", SinkNodeOptions{&sink_gen}},
820             })
821             .AddToPlan(plan.get()));
822 
823     ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
824                 Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON(
825                     {int64(), utf8()}, parallel ? R"([[3600, "alfa"], [2000, "beta"]])"
826                                                 : R"([[36, "alfa"], [20, "beta"]])")}))));
827   }
828 }
829 
TEST(ExecPlanExecution,SourceFilterProjectGroupedSumOrderBy)830 TEST(ExecPlanExecution, SourceFilterProjectGroupedSumOrderBy) {
831   for (bool parallel : {false, true}) {
832     SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
833 
834     int batch_multiplicity = parallel ? 100 : 1;
835     auto input = MakeGroupableBatches(/*multiplicity=*/batch_multiplicity);
836 
837     ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
838     AsyncGenerator<util::optional<ExecBatch>> sink_gen;
839 
840     SortOptions options({SortKey("str", SortOrder::Descending)});
841     ASSERT_OK(
842         Declaration::Sequence(
843             {
844                 {"source",
845                  SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
846                 {"filter",
847                  FilterNodeOptions{greater_equal(field_ref("i32"), literal(0))}},
848                 {"project", ProjectNodeOptions{{
849                                 field_ref("str"),
850                                 call("multiply", {field_ref("i32"), literal(2)}),
851                             }}},
852                 {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
853                                                    /*targets=*/{"multiply(i32, 2)"},
854                                                    /*names=*/{"sum(multiply(i32, 2))"},
855                                                    /*keys=*/{"str"}}},
856                 {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"),
857                                                      literal(10 * batch_multiplicity))}},
858                 {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
859             })
860             .AddToPlan(plan.get()));
861 
862     ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
863                 Finishes(ResultWith(ElementsAreArray({ExecBatchFromJSON(
864                     {int64(), utf8()}, parallel ? R"([[2000, "beta"], [3600, "alfa"]])"
865                                                 : R"([[20, "beta"], [36, "alfa"]])")}))));
866   }
867 }
868 
TEST(ExecPlanExecution,SourceFilterProjectGroupedSumTopK)869 TEST(ExecPlanExecution, SourceFilterProjectGroupedSumTopK) {
870   for (bool parallel : {false, true}) {
871     SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
872 
873     int batch_multiplicity = parallel ? 100 : 1;
874     auto input = MakeGroupableBatches(/*multiplicity=*/batch_multiplicity);
875 
876     ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
877     AsyncGenerator<util::optional<ExecBatch>> sink_gen;
878 
879     SelectKOptions options = SelectKOptions::TopKDefault(/*k=*/1, {"str"});
880     ASSERT_OK(
881         Declaration::Sequence(
882             {
883                 {"source",
884                  SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
885                 {"project", ProjectNodeOptions{{
886                                 field_ref("str"),
887                                 call("multiply", {field_ref("i32"), literal(2)}),
888                             }}},
889                 {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
890                                                    /*targets=*/{"multiply(i32, 2)"},
891                                                    /*names=*/{"sum(multiply(i32, 2))"},
892                                                    /*keys=*/{"str"}}},
893                 {"select_k_sink", SelectKSinkNodeOptions{options, &sink_gen}},
894             })
895             .AddToPlan(plan.get()));
896 
897     ASSERT_THAT(
898         StartAndCollect(plan.get(), sink_gen),
899         Finishes(ResultWith(ElementsAreArray({ExecBatchFromJSON(
900             {int64(), utf8()}, parallel ? R"([[800, "gama"]])" : R"([[8, "gama"]])")}))));
901   }
902 }
903 
TEST(ExecPlanExecution,SourceScalarAggSink)904 TEST(ExecPlanExecution, SourceScalarAggSink) {
905   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
906   AsyncGenerator<util::optional<ExecBatch>> sink_gen;
907 
908   auto basic_data = MakeBasicBatches();
909 
910   ASSERT_OK(Declaration::Sequence(
911                 {
912                     {"source", SourceNodeOptions{basic_data.schema,
913                                                  basic_data.gen(/*parallel=*/false,
914                                                                 /*slow=*/false)}},
915                     {"aggregate", AggregateNodeOptions{
916                                       /*aggregates=*/{{"sum", nullptr}, {"any", nullptr}},
917                                       /*targets=*/{"i32", "bool"},
918                                       /*names=*/{"sum(i32)", "any(bool)"}}},
919                     {"sink", SinkNodeOptions{&sink_gen}},
920                 })
921                 .AddToPlan(plan.get()));
922 
923   ASSERT_THAT(
924       StartAndCollect(plan.get(), sink_gen),
925       Finishes(ResultWith(UnorderedElementsAreArray({
926           ExecBatchFromJSON({ValueDescr::Scalar(int64()), ValueDescr::Scalar(boolean())},
927                             "[[22, true]]"),
928       }))));
929 }
930 
TEST(ExecPlanExecution,AggregationPreservesOptions)931 TEST(ExecPlanExecution, AggregationPreservesOptions) {
932   // ARROW-13638: aggregation nodes initialize per-thread kernel state lazily
933   // and need to keep a copy/strong reference to function options
934   {
935     ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
936     AsyncGenerator<util::optional<ExecBatch>> sink_gen;
937 
938     auto basic_data = MakeBasicBatches();
939 
940     {
941       auto options = std::make_shared<TDigestOptions>(TDigestOptions::Defaults());
942       ASSERT_OK(Declaration::Sequence(
943                     {
944                         {"source", SourceNodeOptions{basic_data.schema,
945                                                      basic_data.gen(/*parallel=*/false,
946                                                                     /*slow=*/false)}},
947                         {"aggregate",
948                          AggregateNodeOptions{/*aggregates=*/{{"tdigest", options.get()}},
949                                               /*targets=*/{"i32"},
950                                               /*names=*/{"tdigest(i32)"}}},
951                         {"sink", SinkNodeOptions{&sink_gen}},
952                     })
953                     .AddToPlan(plan.get()));
954     }
955 
956     ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
957                 Finishes(ResultWith(UnorderedElementsAreArray({
958                     ExecBatchFromJSON({ValueDescr::Array(float64())}, "[[5.5]]"),
959                 }))));
960   }
961   {
962     ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
963     AsyncGenerator<util::optional<ExecBatch>> sink_gen;
964 
965     auto data = MakeGroupableBatches(/*multiplicity=*/100);
966 
967     {
968       auto options = std::make_shared<CountOptions>(CountOptions::Defaults());
969       ASSERT_OK(
970           Declaration::Sequence(
971               {
972                   {"source", SourceNodeOptions{data.schema, data.gen(/*parallel=*/false,
973                                                                      /*slow=*/false)}},
974                   {"aggregate",
975                    AggregateNodeOptions{/*aggregates=*/{{"hash_count", options.get()}},
976                                         /*targets=*/{"i32"},
977                                         /*names=*/{"count(i32)"},
978                                         /*keys=*/{"str"}}},
979                   {"sink", SinkNodeOptions{&sink_gen}},
980               })
981               .AddToPlan(plan.get()));
982     }
983 
984     ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
985                 Finishes(ResultWith(UnorderedElementsAreArray({
986                     ExecBatchFromJSON({int64(), utf8()},
987                                       R"([[500, "alfa"], [200, "beta"], [200, "gama"]])"),
988                 }))));
989   }
990 }
991 
TEST(ExecPlanExecution,ScalarSourceScalarAggSink)992 TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
993   // ARROW-9056: scalar aggregation can be done over scalars, taking
994   // into account batch.length > 1 (e.g. a partition column)
995   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
996   AsyncGenerator<util::optional<ExecBatch>> sink_gen;
997 
998   BatchesWithSchema scalar_data;
999   scalar_data.batches = {
1000       ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(boolean())},
1001                         "[[5, false], [5, false], [5, false]]"),
1002       ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")};
1003   scalar_data.schema = schema({field("a", int32()), field("b", boolean())});
1004 
1005   // index can't be tested as it's order-dependent
1006   // mode/quantile can't be tested as they're technically vector kernels
1007   ASSERT_OK(
1008       Declaration::Sequence(
1009           {
1010               {"source",
1011                SourceNodeOptions{scalar_data.schema, scalar_data.gen(/*parallel=*/false,
1012                                                                      /*slow=*/false)}},
1013               {"aggregate", AggregateNodeOptions{
1014                                 /*aggregates=*/{{"all", nullptr},
1015                                                 {"any", nullptr},
1016                                                 {"count", nullptr},
1017                                                 {"mean", nullptr},
1018                                                 {"product", nullptr},
1019                                                 {"stddev", nullptr},
1020                                                 {"sum", nullptr},
1021                                                 {"tdigest", nullptr},
1022                                                 {"variance", nullptr}},
1023                                 /*targets=*/{"b", "b", "a", "a", "a", "a", "a", "a", "a"},
1024                                 /*names=*/
1025                                 {"all(b)", "any(b)", "count(a)", "mean(a)", "product(a)",
1026                                  "stddev(a)", "sum(a)", "tdigest(a)", "variance(a)"}}},
1027               {"sink", SinkNodeOptions{&sink_gen}},
1028           })
1029           .AddToPlan(plan.get()));
1030 
1031   ASSERT_THAT(
1032       StartAndCollect(plan.get(), sink_gen),
1033       Finishes(ResultWith(UnorderedElementsAreArray({
1034           ExecBatchFromJSON(
1035               {ValueDescr::Scalar(boolean()), ValueDescr::Scalar(boolean()),
1036                ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()),
1037                ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()),
1038                ValueDescr::Scalar(int64()), ValueDescr::Array(float64()),
1039                ValueDescr::Scalar(float64())},
1040               R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"),
1041       }))));
1042 }
1043 
TEST(ExecPlanExecution,ScalarSourceGroupedSum)1044 TEST(ExecPlanExecution, ScalarSourceGroupedSum) {
1045   // ARROW-14630: ensure grouped aggregation with a scalar key/array input doesn't error
1046   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
1047   AsyncGenerator<util::optional<ExecBatch>> sink_gen;
1048 
1049   BatchesWithSchema scalar_data;
1050   scalar_data.batches = {
1051       ExecBatchFromJSON({int32(), ValueDescr::Scalar(boolean())},
1052                         "[[5, false], [6, false], [7, false]]"),
1053       ExecBatchFromJSON({int32(), ValueDescr::Scalar(boolean())},
1054                         "[[1, true], [2, true], [3, true]]"),
1055   };
1056   scalar_data.schema = schema({field("a", int32()), field("b", boolean())});
1057 
1058   SortOptions options({SortKey("b", SortOrder::Descending)});
1059   ASSERT_OK(Declaration::Sequence(
1060                 {
1061                     {"source", SourceNodeOptions{scalar_data.schema,
1062                                                  scalar_data.gen(/*parallel=*/false,
1063                                                                  /*slow=*/false)}},
1064                     {"aggregate",
1065                      AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
1066                                           /*targets=*/{"a"}, /*names=*/{"hash_sum(a)"},
1067                                           /*keys=*/{"b"}}},
1068                     {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
1069                 })
1070                 .AddToPlan(plan.get()));
1071 
1072   ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
1073               Finishes(ResultWith(UnorderedElementsAreArray({
1074                   ExecBatchFromJSON({int64(), boolean()}, R"([[6, true], [18, false]])"),
1075               }))));
1076 }
1077 
TEST(ExecPlanExecution,SelfInnerHashJoinSink)1078 TEST(ExecPlanExecution, SelfInnerHashJoinSink) {
1079   for (bool parallel : {false, true}) {
1080     SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
1081 
1082     auto input = MakeGroupableBatches();
1083 
1084     auto exec_ctx = arrow::internal::make_unique<ExecContext>(
1085         default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);
1086 
1087     ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
1088     AsyncGenerator<util::optional<ExecBatch>> sink_gen;
1089 
1090     ExecNode* left_source;
1091     ExecNode* right_source;
1092     for (auto source : {&left_source, &right_source}) {
1093       ASSERT_OK_AND_ASSIGN(
1094           *source, MakeExecNode("source", plan.get(), {},
1095                                 SourceNodeOptions{input.schema,
1096                                                   input.gen(parallel, /*slow=*/false)}));
1097     }
1098     ASSERT_OK_AND_ASSIGN(
1099         auto left_filter,
1100         MakeExecNode("filter", plan.get(), {left_source},
1101                      FilterNodeOptions{greater_equal(field_ref("i32"), literal(-1))}));
1102     ASSERT_OK_AND_ASSIGN(
1103         auto right_filter,
1104         MakeExecNode("filter", plan.get(), {right_source},
1105                      FilterNodeOptions{less_equal(field_ref("i32"), literal(2))}));
1106 
1107     // left side: [3,  "alfa"], [3,  "alfa"], [12, "alfa"], [3,  "beta"], [7,  "beta"],
1108     // [-1, "gama"], [5,  "gama"]
1109     // right side: [-2, "alfa"], [-8, "alfa"], [-1, "gama"]
1110 
1111     HashJoinNodeOptions join_opts{JoinType::INNER,
1112                                   /*left_keys=*/{"str"},
1113                                   /*right_keys=*/{"str"}, "l_", "r_"};
1114 
1115     ASSERT_OK_AND_ASSIGN(
1116         auto hashjoin,
1117         MakeExecNode("hashjoin", plan.get(), {left_filter, right_filter}, join_opts));
1118 
1119     ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {hashjoin},
1120                                                    SinkNodeOptions{&sink_gen}));
1121 
1122     ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));
1123 
1124     std::vector<ExecBatch> expected = {
1125         ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([
1126             [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
1127             [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
1128             [12, "alfa", -2, "alfa"], [12, "alfa", -8, "alfa"],
1129             [-1, "gama", -1, "gama"], [5, "gama", -1, "gama"]])")};
1130 
1131     AssertExecBatchesEqual(hashjoin->output_schema(), result, expected);
1132   }
1133 }
1134 
TEST(ExecPlanExecution,SelfOuterHashJoinSink)1135 TEST(ExecPlanExecution, SelfOuterHashJoinSink) {
1136   for (bool parallel : {false, true}) {
1137     SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
1138 
1139     auto input = MakeGroupableBatches();
1140 
1141     auto exec_ctx = arrow::internal::make_unique<ExecContext>(
1142         default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);
1143 
1144     ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
1145     AsyncGenerator<util::optional<ExecBatch>> sink_gen;
1146 
1147     ExecNode* left_source;
1148     ExecNode* right_source;
1149     for (auto source : {&left_source, &right_source}) {
1150       ASSERT_OK_AND_ASSIGN(
1151           *source, MakeExecNode("source", plan.get(), {},
1152                                 SourceNodeOptions{input.schema,
1153                                                   input.gen(parallel, /*slow=*/false)}));
1154     }
1155     ASSERT_OK_AND_ASSIGN(
1156         auto left_filter,
1157         MakeExecNode("filter", plan.get(), {left_source},
1158                      FilterNodeOptions{greater_equal(field_ref("i32"), literal(-1))}));
1159     ASSERT_OK_AND_ASSIGN(
1160         auto right_filter,
1161         MakeExecNode("filter", plan.get(), {right_source},
1162                      FilterNodeOptions{less_equal(field_ref("i32"), literal(2))}));
1163 
1164     // left side: [3,  "alfa"], [3,  "alfa"], [12, "alfa"], [3,  "beta"], [7,  "beta"],
1165     // [-1, "gama"], [5,  "gama"]
1166     // right side: [-2, "alfa"], [-8, "alfa"], [-1, "gama"]
1167 
1168     HashJoinNodeOptions join_opts{JoinType::FULL_OUTER,
1169                                   /*left_keys=*/{"str"},
1170                                   /*right_keys=*/{"str"}, "l_", "r_"};
1171 
1172     ASSERT_OK_AND_ASSIGN(
1173         auto hashjoin,
1174         MakeExecNode("hashjoin", plan.get(), {left_filter, right_filter}, join_opts));
1175 
1176     ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {hashjoin},
1177                                                    SinkNodeOptions{&sink_gen}));
1178 
1179     ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));
1180 
1181     std::vector<ExecBatch> expected = {
1182         ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([
1183             [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
1184             [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
1185             [12, "alfa", -2, "alfa"], [12, "alfa", -8, "alfa"],
1186             [3,  "beta", null, null], [7,  "beta", null, null],
1187             [-1, "gama", -1, "gama"], [5, "gama", -1, "gama"]])")};
1188 
1189     AssertExecBatchesEqual(hashjoin->output_schema(), result, expected);
1190   }
1191 }
1192 
TEST(ExecPlan,RecordBatchReaderSourceSink)1193 TEST(ExecPlan, RecordBatchReaderSourceSink) {
1194   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
1195   AsyncGenerator<util::optional<ExecBatch>> sink_gen;
1196 
1197   // set up a RecordBatchReader:
1198   auto input = MakeBasicBatches();
1199 
1200   RecordBatchVector batches;
1201   for (const ExecBatch& exec_batch : input.batches) {
1202     ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema));
1203     batches.push_back(batch);
1204   }
1205 
1206   ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
1207   std::shared_ptr<RecordBatchReader> reader = std::make_shared<TableBatchReader>(*table);
1208 
1209   // Map the RecordBatchReader to a SourceNode
1210   ASSERT_OK_AND_ASSIGN(
1211       auto batch_gen,
1212       MakeReaderGenerator(std::move(reader), arrow::io::internal::GetIOThreadPool()));
1213 
1214   ASSERT_OK(
1215       Declaration::Sequence({
1216                                 {"source", SourceNodeOptions{table->schema(), batch_gen}},
1217                                 {"sink", SinkNodeOptions{&sink_gen}},
1218                             })
1219           .AddToPlan(plan.get()));
1220 
1221   ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
1222               Finishes(ResultWith(UnorderedElementsAreArray(input.batches))));
1223 }
1224 
1225 }  // namespace compute
1226 }  // namespace arrow
1227