1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <gmock/gmock-matchers.h>
19
20 #include <functional>
21 #include <memory>
22
23 #include "arrow/compute/exec.h"
24 #include "arrow/compute/exec/exec_plan.h"
25 #include "arrow/compute/exec/expression.h"
26 #include "arrow/compute/exec/options.h"
27 #include "arrow/compute/exec/test_util.h"
28 #include "arrow/compute/exec/util.h"
29 #include "arrow/io/util_internal.h"
30 #include "arrow/record_batch.h"
31 #include "arrow/table.h"
32 #include "arrow/testing/future_util.h"
33 #include "arrow/testing/gtest_util.h"
34 #include "arrow/testing/matchers.h"
35 #include "arrow/testing/random.h"
36 #include "arrow/util/async_generator.h"
37 #include "arrow/util/logging.h"
38 #include "arrow/util/make_unique.h"
39 #include "arrow/util/thread_pool.h"
40 #include "arrow/util/vector.h"
41
42 using testing::ElementsAre;
43 using testing::ElementsAreArray;
44 using testing::HasSubstr;
45 using testing::Optional;
46 using testing::UnorderedElementsAreArray;
47
48 namespace arrow {
49
50 namespace compute {
51
TEST(ExecPlanConstruction,Empty)52 TEST(ExecPlanConstruction, Empty) {
53 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
54
55 ASSERT_THAT(plan->Validate(), Raises(StatusCode::Invalid));
56 }
57
TEST(ExecPlanConstruction,SingleNode)58 TEST(ExecPlanConstruction, SingleNode) {
59 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
60 auto node = MakeDummyNode(plan.get(), "dummy", /*inputs=*/{}, /*num_outputs=*/0);
61 ASSERT_OK(plan->Validate());
62 ASSERT_THAT(plan->sources(), ElementsAre(node));
63 ASSERT_THAT(plan->sinks(), ElementsAre(node));
64
65 ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
66 node = MakeDummyNode(plan.get(), "dummy", /*inputs=*/{}, /*num_outputs=*/1);
67 // Output not bound
68 ASSERT_THAT(plan->Validate(), Raises(StatusCode::Invalid));
69 }
70
TEST(ExecPlanConstruction,SourceSink)71 TEST(ExecPlanConstruction, SourceSink) {
72 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
73 auto source = MakeDummyNode(plan.get(), "source", /*inputs=*/{}, /*num_outputs=*/1);
74 auto sink = MakeDummyNode(plan.get(), "sink", /*inputs=*/{source}, /*num_outputs=*/0);
75
76 ASSERT_OK(plan->Validate());
77 EXPECT_THAT(plan->sources(), ElementsAre(source));
78 EXPECT_THAT(plan->sinks(), ElementsAre(sink));
79 }
80
TEST(ExecPlanConstruction,MultipleNode)81 TEST(ExecPlanConstruction, MultipleNode) {
82 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
83
84 auto source1 = MakeDummyNode(plan.get(), "source1", /*inputs=*/{}, /*num_outputs=*/2);
85
86 auto source2 = MakeDummyNode(plan.get(), "source2", /*inputs=*/{}, /*num_outputs=*/1);
87
88 auto process1 =
89 MakeDummyNode(plan.get(), "process1", /*inputs=*/{source1}, /*num_outputs=*/2);
90
91 auto process2 = MakeDummyNode(plan.get(), "process1", /*inputs=*/{source1, source2},
92 /*num_outputs=*/1);
93
94 auto process3 =
95 MakeDummyNode(plan.get(), "process3", /*inputs=*/{process1, process2, process1},
96 /*num_outputs=*/1);
97
98 auto sink = MakeDummyNode(plan.get(), "sink", /*inputs=*/{process3}, /*num_outputs=*/0);
99
100 ASSERT_OK(plan->Validate());
101 ASSERT_THAT(plan->sources(), ElementsAre(source1, source2));
102 ASSERT_THAT(plan->sinks(), ElementsAre(sink));
103 }
104
TEST(ExecPlanConstruction,AutoLabel)105 TEST(ExecPlanConstruction, AutoLabel) {
106 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
107 auto source1 = MakeDummyNode(plan.get(), "", /*inputs=*/{}, /*num_outputs=*/2);
108 auto source2 =
109 MakeDummyNode(plan.get(), "some_label", /*inputs=*/{}, /*num_outputs=*/1);
110 auto source3 = MakeDummyNode(plan.get(), "", /*inputs=*/{}, /*num_outputs=*/2);
111
112 ASSERT_EQ("0", source1->label());
113 ASSERT_EQ("some_label", source2->label());
114 ASSERT_EQ("2", source3->label());
115 }
116
117 struct StartStopTracker {
118 std::vector<std::string> started, stopped;
119
start_producing_funcarrow::compute::StartStopTracker120 StartProducingFunc start_producing_func(Status st = Status::OK()) {
121 return [this, st](ExecNode* node) {
122 started.push_back(node->label());
123 return st;
124 };
125 }
126
stop_producing_funcarrow::compute::StartStopTracker127 StopProducingFunc stop_producing_func() {
128 return [this](ExecNode* node) { stopped.push_back(node->label()); };
129 }
130 };
131
TEST(ExecPlan,DummyStartProducing)132 TEST(ExecPlan, DummyStartProducing) {
133 StartStopTracker t;
134
135 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
136
137 auto source1 = MakeDummyNode(plan.get(), "source1", /*inputs=*/{}, /*num_outputs=*/2,
138 t.start_producing_func(), t.stop_producing_func());
139
140 auto source2 = MakeDummyNode(plan.get(), "source2", /*inputs=*/{}, /*num_outputs=*/1,
141 t.start_producing_func(), t.stop_producing_func());
142
143 auto process1 =
144 MakeDummyNode(plan.get(), "process1", /*inputs=*/{source1}, /*num_outputs=*/2,
145 t.start_producing_func(), t.stop_producing_func());
146
147 auto process2 =
148 MakeDummyNode(plan.get(), "process2", /*inputs=*/{process1, source2},
149 /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
150
151 auto process3 =
152 MakeDummyNode(plan.get(), "process3", /*inputs=*/{process1, source1, process2},
153 /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
154
155 MakeDummyNode(plan.get(), "sink", /*inputs=*/{process3}, /*num_outputs=*/0,
156 t.start_producing_func(), t.stop_producing_func());
157
158 ASSERT_OK(plan->Validate());
159 ASSERT_EQ(t.started.size(), 0);
160 ASSERT_EQ(t.stopped.size(), 0);
161
162 ASSERT_OK(plan->StartProducing());
163 // Note that any correct reverse topological order may do
164 ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", "process1",
165 "source2", "source1"));
166
167 plan->StopProducing();
168 ASSERT_THAT(plan->finished(), Finishes(Ok()));
169 // Note that any correct topological order may do
170 ASSERT_THAT(t.stopped, ElementsAre("source1", "source2", "process1", "process2",
171 "process3", "sink"));
172
173 ASSERT_THAT(plan->StartProducing(),
174 Raises(StatusCode::Invalid, HasSubstr("restarted")));
175 }
176
TEST(ExecPlan,DummyStartProducingError)177 TEST(ExecPlan, DummyStartProducingError) {
178 StartStopTracker t;
179
180 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
181 auto source1 = MakeDummyNode(
182 plan.get(), "source1", /*num_inputs=*/{}, /*num_outputs=*/2,
183 t.start_producing_func(Status::NotImplemented("zzz")), t.stop_producing_func());
184
185 auto source2 =
186 MakeDummyNode(plan.get(), "source2", /*num_inputs=*/{}, /*num_outputs=*/1,
187 t.start_producing_func(), t.stop_producing_func());
188
189 auto process1 = MakeDummyNode(
190 plan.get(), "process1", /*num_inputs=*/{source1}, /*num_outputs=*/2,
191 t.start_producing_func(Status::IOError("xxx")), t.stop_producing_func());
192
193 auto process2 =
194 MakeDummyNode(plan.get(), "process2", /*num_inputs=*/{process1, source2},
195 /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
196
197 auto process3 =
198 MakeDummyNode(plan.get(), "process3", /*num_inputs=*/{process1, source1, process2},
199 /*num_outputs=*/1, t.start_producing_func(), t.stop_producing_func());
200
201 MakeDummyNode(plan.get(), "sink", /*num_inputs=*/{process3}, /*num_outputs=*/0,
202 t.start_producing_func(), t.stop_producing_func());
203
204 ASSERT_OK(plan->Validate());
205 ASSERT_EQ(t.started.size(), 0);
206 ASSERT_EQ(t.stopped.size(), 0);
207
208 // `process1` raises IOError
209 ASSERT_THAT(plan->StartProducing(), Raises(StatusCode::IOError));
210 ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", "process1"));
211 // Nodes that started successfully were stopped in reverse order
212 ASSERT_THAT(t.stopped, ElementsAre("process2", "process3", "sink"));
213 }
214
TEST(ExecPlanExecution,SourceSink)215 TEST(ExecPlanExecution, SourceSink) {
216 for (bool slow : {false, true}) {
217 SCOPED_TRACE(slow ? "slowed" : "unslowed");
218
219 for (bool parallel : {false, true}) {
220 SCOPED_TRACE(parallel ? "parallel" : "single threaded");
221
222 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
223 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
224
225 auto basic_data = MakeBasicBatches();
226
227 ASSERT_OK(Declaration::Sequence(
228 {
229 {"source", SourceNodeOptions{basic_data.schema,
230 basic_data.gen(parallel, slow)}},
231 {"sink", SinkNodeOptions{&sink_gen}},
232 })
233 .AddToPlan(plan.get()));
234
235 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
236 Finishes(ResultWith(UnorderedElementsAreArray(basic_data.batches))));
237 }
238 }
239 }
240
TEST(ExecPlanExecution,SinkNodeBackpressure)241 TEST(ExecPlanExecution, SinkNodeBackpressure) {
242 constexpr uint32_t kPauseIfAbove = 4;
243 constexpr uint32_t kResumeIfBelow = 2;
244 EXPECT_OK_AND_ASSIGN(std::shared_ptr<ExecPlan> plan, ExecPlan::Make());
245 PushGenerator<util::optional<ExecBatch>> batch_producer;
246 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
247 util::BackpressureOptions backpressure_options =
248 util::BackpressureOptions::Make(kResumeIfBelow, kPauseIfAbove);
249 std::shared_ptr<Schema> schema_ = schema({field("data", uint32())});
250 ARROW_EXPECT_OK(compute::Declaration::Sequence(
251 {
252 {"source", SourceNodeOptions(schema_, batch_producer)},
253 {"sink", SinkNodeOptions{&sink_gen, backpressure_options}},
254 })
255 .AddToPlan(plan.get()));
256 ARROW_EXPECT_OK(plan->StartProducing());
257
258 EXPECT_OK_AND_ASSIGN(util::optional<ExecBatch> batch, ExecBatch::Make({MakeScalar(0)}));
259 ASSERT_TRUE(backpressure_options.toggle->IsOpen());
260
261 // Should be able to push kPauseIfAbove batches without triggering back pressure
262 for (uint32_t i = 0; i < kPauseIfAbove; i++) {
263 batch_producer.producer().Push(batch);
264 }
265 SleepABit();
266 ASSERT_TRUE(backpressure_options.toggle->IsOpen());
267
268 // One more batch should trigger back pressure
269 batch_producer.producer().Push(batch);
270 BusyWait(10, [&] { return !backpressure_options.toggle->IsOpen(); });
271 ASSERT_FALSE(backpressure_options.toggle->IsOpen());
272
273 // Reading as much as we can while keeping it paused
274 for (uint32_t i = kPauseIfAbove; i >= kResumeIfBelow; i--) {
275 ASSERT_FINISHES_OK(sink_gen());
276 }
277 SleepABit();
278 ASSERT_FALSE(backpressure_options.toggle->IsOpen());
279
280 // Reading one more item should open up backpressure
281 ASSERT_FINISHES_OK(sink_gen());
282 BusyWait(10, [&] { return backpressure_options.toggle->IsOpen(); });
283 ASSERT_TRUE(backpressure_options.toggle->IsOpen());
284
285 // Cleanup
286 batch_producer.producer().Push(IterationEnd<util::optional<ExecBatch>>());
287 plan->StopProducing();
288 ASSERT_FINISHES_OK(plan->finished());
289 }
290
TEST(ExecPlan,ToString)291 TEST(ExecPlan, ToString) {
292 auto basic_data = MakeBasicBatches();
293 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
294
295 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
296 ASSERT_OK(Declaration::Sequence(
297 {
298 {"source", SourceNodeOptions{basic_data.schema,
299 basic_data.gen(/*parallel=*/false,
300 /*slow=*/false)}},
301 {"sink", SinkNodeOptions{&sink_gen}},
302 })
303 .AddToPlan(plan.get()));
304 EXPECT_EQ(plan->sources()[0]->ToString(), R"(SourceNode{"source", outputs=["sink"]})");
305 EXPECT_EQ(plan->sinks()[0]->ToString(),
306 R"(SinkNode{"sink", inputs=[collected: "source"]})");
307 EXPECT_EQ(plan->ToString(), R"(ExecPlan with 2 nodes:
308 SourceNode{"source", outputs=["sink"]}
309 SinkNode{"sink", inputs=[collected: "source"]}
310 )");
311
312 ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
313 CountOptions options(CountOptions::ONLY_VALID);
314 ASSERT_OK(
315 Declaration::Sequence(
316 {
317 {"source",
318 SourceNodeOptions{basic_data.schema,
319 basic_data.gen(/*parallel=*/false, /*slow=*/false)}},
320 {"filter", FilterNodeOptions{greater_equal(field_ref("i32"), literal(0))}},
321 {"project", ProjectNodeOptions{{
322 field_ref("bool"),
323 call("multiply", {field_ref("i32"), literal(2)}),
324 }}},
325 {"aggregate",
326 AggregateNodeOptions{
327 /*aggregates=*/{{"hash_sum", nullptr}, {"hash_count", &options}},
328 /*targets=*/{"multiply(i32, 2)", "multiply(i32, 2)"},
329 /*names=*/{"sum(multiply(i32, 2))", "count(multiply(i32, 2))"},
330 /*keys=*/{"bool"}}},
331 {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"),
332 literal(10))}},
333 {"order_by_sink",
334 OrderBySinkNodeOptions{
335 SortOptions({SortKey{"sum(multiply(i32, 2))", SortOrder::Ascending}}),
336 &sink_gen}},
337 })
338 .AddToPlan(plan.get()));
339 EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 6 nodes:
340 SourceNode{"source", outputs=["filter"]}
341 FilterNode{"filter", inputs=[target: "source"], outputs=["project"], filter=(i32 >= 0)}
342 ProjectNode{"project", inputs=[target: "filter"], outputs=["aggregate"], projection=[bool, multiply(i32, 2)]}
343 GroupByNode{"aggregate", inputs=[groupby: "project"], outputs=["filter"], keys=["bool"], aggregates=[
344 hash_sum(multiply(i32, 2)),
345 hash_count(multiply(i32, 2), {mode=NON_NULL}),
346 ]}
347 FilterNode{"filter", inputs=[target: "aggregate"], outputs=["order_by_sink"], filter=(sum(multiply(i32, 2)) > 10)}
348 OrderBySinkNode{"order_by_sink", inputs=[collected: "filter"], by={sort_keys=[sum(multiply(i32, 2)) ASC], null_placement=AtEnd}}
349 )a");
350
351 ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
352 Declaration union_node{"union", ExecNodeOptions{}};
353 Declaration lhs{"source",
354 SourceNodeOptions{basic_data.schema,
355 basic_data.gen(/*parallel=*/false, /*slow=*/false)}};
356 lhs.label = "lhs";
357 Declaration rhs{"source",
358 SourceNodeOptions{basic_data.schema,
359 basic_data.gen(/*parallel=*/false, /*slow=*/false)}};
360 rhs.label = "rhs";
361 union_node.inputs.emplace_back(lhs);
362 union_node.inputs.emplace_back(rhs);
363 ASSERT_OK(
364 Declaration::Sequence(
365 {
366 union_node,
367 {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"count", &options}},
368 /*targets=*/{"i32"},
369 /*names=*/{"count(i32)"},
370 /*keys=*/{}}},
371 {"sink", SinkNodeOptions{&sink_gen}},
372 })
373 .AddToPlan(plan.get()));
374 EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 5 nodes:
375 SourceNode{"lhs", outputs=["union"]}
376 SourceNode{"rhs", outputs=["union"]}
377 UnionNode{"union", inputs=[input_0_label: "lhs", input_1_label: "rhs"], outputs=["aggregate"]}
378 ScalarAggregateNode{"aggregate", inputs=[target: "union"], outputs=["sink"], aggregates=[
379 count(i32, {mode=NON_NULL}),
380 ]}
381 SinkNode{"sink", inputs=[collected: "aggregate"]}
382 )a");
383 }
384
TEST(ExecPlanExecution,SourceOrderBy)385 TEST(ExecPlanExecution, SourceOrderBy) {
386 std::vector<ExecBatch> expected = {
387 ExecBatchFromJSON({int32(), boolean()},
388 "[[4, false], [5, null], [6, false], [7, false], [null, true]]")};
389 for (bool slow : {false, true}) {
390 SCOPED_TRACE(slow ? "slowed" : "unslowed");
391
392 for (bool parallel : {false, true}) {
393 SCOPED_TRACE(parallel ? "parallel" : "single threaded");
394
395 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
396 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
397
398 auto basic_data = MakeBasicBatches();
399
400 SortOptions options({SortKey("i32", SortOrder::Ascending)});
401 ASSERT_OK(Declaration::Sequence(
402 {
403 {"source", SourceNodeOptions{basic_data.schema,
404 basic_data.gen(parallel, slow)}},
405 {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
406 })
407 .AddToPlan(plan.get()));
408
409 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
410 Finishes(ResultWith(ElementsAreArray(expected))));
411 }
412 }
413 }
414
TEST(ExecPlanExecution,SourceSinkError)415 TEST(ExecPlanExecution, SourceSinkError) {
416 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
417 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
418
419 auto basic_data = MakeBasicBatches();
420 auto it = basic_data.batches.begin();
421 AsyncGenerator<util::optional<ExecBatch>> error_source_gen =
422 [&]() -> Result<util::optional<ExecBatch>> {
423 if (it == basic_data.batches.end()) {
424 return Status::Invalid("Artificial error");
425 }
426 return util::make_optional(*it++);
427 };
428
429 ASSERT_OK(Declaration::Sequence(
430 {
431 {"source", SourceNodeOptions{basic_data.schema, error_source_gen}},
432 {"sink", SinkNodeOptions{&sink_gen}},
433 })
434 .AddToPlan(plan.get()));
435
436 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
437 Finishes(Raises(StatusCode::Invalid, HasSubstr("Artificial"))));
438 }
439
TEST(ExecPlanExecution,SourceConsumingSink)440 TEST(ExecPlanExecution, SourceConsumingSink) {
441 for (bool slow : {false, true}) {
442 SCOPED_TRACE(slow ? "slowed" : "unslowed");
443
444 for (bool parallel : {false, true}) {
445 SCOPED_TRACE(parallel ? "parallel" : "single threaded");
446 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
447 std::atomic<uint32_t> batches_seen{0};
448 Future<> finish = Future<>::Make();
449 struct TestConsumer : public SinkNodeConsumer {
450 TestConsumer(std::atomic<uint32_t>* batches_seen, Future<> finish)
451 : batches_seen(batches_seen), finish(std::move(finish)) {}
452
453 Status Consume(ExecBatch batch) override {
454 (*batches_seen)++;
455 return Status::OK();
456 }
457
458 Future<> Finish() override { return finish; }
459
460 std::atomic<uint32_t>* batches_seen;
461 Future<> finish;
462 };
463 std::shared_ptr<TestConsumer> consumer =
464 std::make_shared<TestConsumer>(&batches_seen, finish);
465
466 auto basic_data = MakeBasicBatches();
467 ASSERT_OK_AND_ASSIGN(
468 auto source, MakeExecNode("source", plan.get(), {},
469 SourceNodeOptions(basic_data.schema,
470 basic_data.gen(parallel, slow))));
471 ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
472 ConsumingSinkNodeOptions(consumer)));
473 ASSERT_OK(plan->StartProducing());
474 // Source should finish fairly quickly
475 ASSERT_FINISHES_OK(source->finished());
476 SleepABit();
477 ASSERT_EQ(2, batches_seen);
478 // Consumer isn't finished and so plan shouldn't have finished
479 AssertNotFinished(plan->finished());
480 // Mark consumption complete, plan should finish
481 finish.MarkFinished();
482 ASSERT_FINISHES_OK(plan->finished());
483 }
484 }
485 }
486
TEST(ExecPlanExecution,ConsumingSinkError)487 TEST(ExecPlanExecution, ConsumingSinkError) {
488 struct ConsumeErrorConsumer : public SinkNodeConsumer {
489 Status Consume(ExecBatch batch) override { return Status::Invalid("XYZ"); }
490 Future<> Finish() override { return Future<>::MakeFinished(); }
491 };
492 struct FinishErrorConsumer : public SinkNodeConsumer {
493 Status Consume(ExecBatch batch) override { return Status::OK(); }
494 Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); }
495 };
496 std::vector<std::shared_ptr<SinkNodeConsumer>> consumers{
497 std::make_shared<ConsumeErrorConsumer>(), std::make_shared<FinishErrorConsumer>()};
498
499 for (auto& consumer : consumers) {
500 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
501 auto basic_data = MakeBasicBatches();
502 ASSERT_OK(Declaration::Sequence(
503 {{"source",
504 SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))},
505 {"consuming_sink", ConsumingSinkNodeOptions(consumer)}})
506 .AddToPlan(plan.get()));
507 ASSERT_OK_AND_ASSIGN(
508 auto source,
509 MakeExecNode("source", plan.get(), {},
510 SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
511 ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
512 ConsumingSinkNodeOptions(consumer)));
513 ASSERT_OK(plan->StartProducing());
514 ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
515 }
516 }
517
TEST(ExecPlanExecution,ConsumingSinkErrorFinish)518 TEST(ExecPlanExecution, ConsumingSinkErrorFinish) {
519 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
520 struct FinishErrorConsumer : public SinkNodeConsumer {
521 Status Consume(ExecBatch batch) override { return Status::OK(); }
522 Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); }
523 };
524 std::shared_ptr<FinishErrorConsumer> consumer = std::make_shared<FinishErrorConsumer>();
525
526 auto basic_data = MakeBasicBatches();
527 ASSERT_OK(
528 Declaration::Sequence(
529 {{"source", SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))},
530 {"consuming_sink", ConsumingSinkNodeOptions(consumer)}})
531 .AddToPlan(plan.get()));
532 ASSERT_OK_AND_ASSIGN(
533 auto source,
534 MakeExecNode("source", plan.get(), {},
535 SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
536 ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
537 ConsumingSinkNodeOptions(consumer)));
538 ASSERT_OK(plan->StartProducing());
539 ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
540 }
541
TEST(ExecPlanExecution,StressSourceSink)542 TEST(ExecPlanExecution, StressSourceSink) {
543 for (bool slow : {false, true}) {
544 SCOPED_TRACE(slow ? "slowed" : "unslowed");
545
546 for (bool parallel : {false, true}) {
547 SCOPED_TRACE(parallel ? "parallel" : "single threaded");
548
549 int num_batches = (slow && !parallel) ? 30 : 300;
550
551 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
552 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
553
554 auto random_data = MakeRandomBatches(
555 schema({field("a", int32()), field("b", boolean())}), num_batches);
556
557 ASSERT_OK(Declaration::Sequence(
558 {
559 {"source", SourceNodeOptions{random_data.schema,
560 random_data.gen(parallel, slow)}},
561 {"sink", SinkNodeOptions{&sink_gen}},
562 })
563 .AddToPlan(plan.get()));
564
565 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
566 Finishes(ResultWith(UnorderedElementsAreArray(random_data.batches))));
567 }
568 }
569 }
570
TEST(ExecPlanExecution,StressSourceOrderBy)571 TEST(ExecPlanExecution, StressSourceOrderBy) {
572 auto input_schema = schema({field("a", int32()), field("b", boolean())});
573 for (bool slow : {false, true}) {
574 SCOPED_TRACE(slow ? "slowed" : "unslowed");
575
576 for (bool parallel : {false, true}) {
577 SCOPED_TRACE(parallel ? "parallel" : "single threaded");
578
579 int num_batches = (slow && !parallel) ? 30 : 300;
580
581 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
582 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
583
584 auto random_data = MakeRandomBatches(input_schema, num_batches);
585
586 SortOptions options({SortKey("a", SortOrder::Ascending)});
587 ASSERT_OK(Declaration::Sequence(
588 {
589 {"source", SourceNodeOptions{random_data.schema,
590 random_data.gen(parallel, slow)}},
591 {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
592 })
593 .AddToPlan(plan.get()));
594
595 // Check that data is sorted appropriately
596 ASSERT_FINISHES_OK_AND_ASSIGN(auto exec_batches,
597 StartAndCollect(plan.get(), sink_gen));
598 ASSERT_OK_AND_ASSIGN(auto actual, TableFromExecBatches(input_schema, exec_batches));
599 ASSERT_OK_AND_ASSIGN(auto original,
600 TableFromExecBatches(input_schema, random_data.batches));
601 ASSERT_OK_AND_ASSIGN(auto sort_indices, SortIndices(original, options));
602 ASSERT_OK_AND_ASSIGN(auto expected, Take(original, sort_indices));
603 AssertTablesEqual(*actual, *expected.table());
604 }
605 }
606 }
607
TEST(ExecPlanExecution,StressSourceGroupedSumStop)608 TEST(ExecPlanExecution, StressSourceGroupedSumStop) {
609 auto input_schema = schema({field("a", int32()), field("b", boolean())});
610 for (bool slow : {false, true}) {
611 SCOPED_TRACE(slow ? "slowed" : "unslowed");
612
613 for (bool parallel : {false, true}) {
614 SCOPED_TRACE(parallel ? "parallel" : "single threaded");
615
616 int num_batches = (slow && !parallel) ? 30 : 300;
617
618 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
619 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
620
621 auto random_data = MakeRandomBatches(input_schema, num_batches);
622
623 SortOptions options({SortKey("a", SortOrder::Ascending)});
624 ASSERT_OK(Declaration::Sequence(
625 {
626 {"source", SourceNodeOptions{random_data.schema,
627 random_data.gen(parallel, slow)}},
628 {"aggregate",
629 AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
630 /*targets=*/{"a"}, /*names=*/{"sum(a)"},
631 /*keys=*/{"b"}}},
632 {"sink", SinkNodeOptions{&sink_gen}},
633 })
634 .AddToPlan(plan.get()));
635
636 ASSERT_OK(plan->Validate());
637 ASSERT_OK(plan->StartProducing());
638 plan->StopProducing();
639 ASSERT_FINISHES_OK(plan->finished());
640 }
641 }
642 }
643
TEST(ExecPlanExecution,StressSourceSinkStopped)644 TEST(ExecPlanExecution, StressSourceSinkStopped) {
645 for (bool slow : {false, true}) {
646 SCOPED_TRACE(slow ? "slowed" : "unslowed");
647
648 for (bool parallel : {false, true}) {
649 SCOPED_TRACE(parallel ? "parallel" : "single threaded");
650
651 int num_batches = (slow && !parallel) ? 30 : 300;
652
653 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
654 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
655
656 auto random_data = MakeRandomBatches(
657 schema({field("a", int32()), field("b", boolean())}), num_batches);
658
659 ASSERT_OK(Declaration::Sequence(
660 {
661 {"source", SourceNodeOptions{random_data.schema,
662 random_data.gen(parallel, slow)}},
663 {"sink", SinkNodeOptions{&sink_gen}},
664 })
665 .AddToPlan(plan.get()));
666
667 ASSERT_OK(plan->Validate());
668 ASSERT_OK(plan->StartProducing());
669
670 EXPECT_THAT(sink_gen(), Finishes(ResultWith(Optional(random_data.batches[0]))));
671
672 plan->StopProducing();
673 ASSERT_THAT(plan->finished(), Finishes(Ok()));
674 }
675 }
676 }
677
TEST(ExecPlanExecution,SourceFilterSink)678 TEST(ExecPlanExecution, SourceFilterSink) {
679 auto basic_data = MakeBasicBatches();
680
681 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
682 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
683
684 ASSERT_OK(Declaration::Sequence(
685 {
686 {"source", SourceNodeOptions{basic_data.schema,
687 basic_data.gen(/*parallel=*/false,
688 /*slow=*/false)}},
689 {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}},
690 {"sink", SinkNodeOptions{&sink_gen}},
691 })
692 .AddToPlan(plan.get()));
693
694 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
695 Finishes(ResultWith(UnorderedElementsAreArray(
696 {ExecBatchFromJSON({int32(), boolean()}, "[]"),
697 ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")}))));
698 }
699
TEST(ExecPlanExecution,SourceProjectSink)700 TEST(ExecPlanExecution, SourceProjectSink) {
701 auto basic_data = MakeBasicBatches();
702
703 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
704 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
705
706 ASSERT_OK(Declaration::Sequence(
707 {
708 {"source", SourceNodeOptions{basic_data.schema,
709 basic_data.gen(/*parallel=*/false,
710 /*slow=*/false)}},
711 {"project",
712 ProjectNodeOptions{{
713 not_(field_ref("bool")),
714 call("add", {field_ref("i32"), literal(1)}),
715 },
716 {"!bool", "i32 + 1"}}},
717 {"sink", SinkNodeOptions{&sink_gen}},
718 })
719 .AddToPlan(plan.get()));
720
721 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
722 Finishes(ResultWith(UnorderedElementsAreArray(
723 {ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"),
724 ExecBatchFromJSON({boolean(), int32()},
725 "[[null, 6], [true, 7], [true, 8]]")}))));
726 }
727
728 namespace {
729
MakeGroupableBatches(int multiplicity=1)730 BatchesWithSchema MakeGroupableBatches(int multiplicity = 1) {
731 BatchesWithSchema out;
732
733 out.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([
734 [12, "alfa"],
735 [7, "beta"],
736 [3, "alfa"]
737 ])"),
738 ExecBatchFromJSON({int32(), utf8()}, R"([
739 [-2, "alfa"],
740 [-1, "gama"],
741 [3, "alfa"]
742 ])"),
743 ExecBatchFromJSON({int32(), utf8()}, R"([
744 [5, "gama"],
745 [3, "beta"],
746 [-8, "alfa"]
747 ])")};
748
749 size_t batch_count = out.batches.size();
750 for (int repeat = 1; repeat < multiplicity; ++repeat) {
751 for (size_t i = 0; i < batch_count; ++i) {
752 out.batches.push_back(out.batches[i]);
753 }
754 }
755
756 out.schema = schema({field("i32", int32()), field("str", utf8())});
757
758 return out;
759 }
760
761 } // namespace
762
TEST(ExecPlanExecution,SourceGroupedSum)763 TEST(ExecPlanExecution, SourceGroupedSum) {
764 for (bool parallel : {false, true}) {
765 SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
766
767 auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1);
768
769 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
770 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
771
772 ASSERT_OK(Declaration::Sequence(
773 {
774 {"source", SourceNodeOptions{input.schema,
775 input.gen(parallel, /*slow=*/false)}},
776 {"aggregate",
777 AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
778 /*targets=*/{"i32"}, /*names=*/{"sum(i32)"},
779 /*keys=*/{"str"}}},
780 {"sink", SinkNodeOptions{&sink_gen}},
781 })
782 .AddToPlan(plan.get()));
783
784 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
785 Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON(
786 {int64(), utf8()},
787 parallel ? R"([[800, "alfa"], [1000, "beta"], [400, "gama"]])"
788 : R"([[8, "alfa"], [10, "beta"], [4, "gama"]])")}))));
789 }
790 }
791
TEST(ExecPlanExecution,SourceFilterProjectGroupedSumFilter)792 TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) {
793 for (bool parallel : {false, true}) {
794 SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
795
796 int batch_multiplicity = parallel ? 100 : 1;
797 auto input = MakeGroupableBatches(/*multiplicity=*/batch_multiplicity);
798
799 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
800 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
801
802 ASSERT_OK(
803 Declaration::Sequence(
804 {
805 {"source",
806 SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
807 {"filter",
808 FilterNodeOptions{greater_equal(field_ref("i32"), literal(0))}},
809 {"project", ProjectNodeOptions{{
810 field_ref("str"),
811 call("multiply", {field_ref("i32"), literal(2)}),
812 }}},
813 {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
814 /*targets=*/{"multiply(i32, 2)"},
815 /*names=*/{"sum(multiply(i32, 2))"},
816 /*keys=*/{"str"}}},
817 {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"),
818 literal(10 * batch_multiplicity))}},
819 {"sink", SinkNodeOptions{&sink_gen}},
820 })
821 .AddToPlan(plan.get()));
822
823 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
824 Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON(
825 {int64(), utf8()}, parallel ? R"([[3600, "alfa"], [2000, "beta"]])"
826 : R"([[36, "alfa"], [20, "beta"]])")}))));
827 }
828 }
829
TEST(ExecPlanExecution,SourceFilterProjectGroupedSumOrderBy)830 TEST(ExecPlanExecution, SourceFilterProjectGroupedSumOrderBy) {
831 for (bool parallel : {false, true}) {
832 SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
833
834 int batch_multiplicity = parallel ? 100 : 1;
835 auto input = MakeGroupableBatches(/*multiplicity=*/batch_multiplicity);
836
837 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
838 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
839
840 SortOptions options({SortKey("str", SortOrder::Descending)});
841 ASSERT_OK(
842 Declaration::Sequence(
843 {
844 {"source",
845 SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
846 {"filter",
847 FilterNodeOptions{greater_equal(field_ref("i32"), literal(0))}},
848 {"project", ProjectNodeOptions{{
849 field_ref("str"),
850 call("multiply", {field_ref("i32"), literal(2)}),
851 }}},
852 {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
853 /*targets=*/{"multiply(i32, 2)"},
854 /*names=*/{"sum(multiply(i32, 2))"},
855 /*keys=*/{"str"}}},
856 {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"),
857 literal(10 * batch_multiplicity))}},
858 {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
859 })
860 .AddToPlan(plan.get()));
861
862 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
863 Finishes(ResultWith(ElementsAreArray({ExecBatchFromJSON(
864 {int64(), utf8()}, parallel ? R"([[2000, "beta"], [3600, "alfa"]])"
865 : R"([[20, "beta"], [36, "alfa"]])")}))));
866 }
867 }
868
TEST(ExecPlanExecution,SourceFilterProjectGroupedSumTopK)869 TEST(ExecPlanExecution, SourceFilterProjectGroupedSumTopK) {
870 for (bool parallel : {false, true}) {
871 SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
872
873 int batch_multiplicity = parallel ? 100 : 1;
874 auto input = MakeGroupableBatches(/*multiplicity=*/batch_multiplicity);
875
876 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
877 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
878
879 SelectKOptions options = SelectKOptions::TopKDefault(/*k=*/1, {"str"});
880 ASSERT_OK(
881 Declaration::Sequence(
882 {
883 {"source",
884 SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
885 {"project", ProjectNodeOptions{{
886 field_ref("str"),
887 call("multiply", {field_ref("i32"), literal(2)}),
888 }}},
889 {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
890 /*targets=*/{"multiply(i32, 2)"},
891 /*names=*/{"sum(multiply(i32, 2))"},
892 /*keys=*/{"str"}}},
893 {"select_k_sink", SelectKSinkNodeOptions{options, &sink_gen}},
894 })
895 .AddToPlan(plan.get()));
896
897 ASSERT_THAT(
898 StartAndCollect(plan.get(), sink_gen),
899 Finishes(ResultWith(ElementsAreArray({ExecBatchFromJSON(
900 {int64(), utf8()}, parallel ? R"([[800, "gama"]])" : R"([[8, "gama"]])")}))));
901 }
902 }
903
TEST(ExecPlanExecution,SourceScalarAggSink)904 TEST(ExecPlanExecution, SourceScalarAggSink) {
905 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
906 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
907
908 auto basic_data = MakeBasicBatches();
909
910 ASSERT_OK(Declaration::Sequence(
911 {
912 {"source", SourceNodeOptions{basic_data.schema,
913 basic_data.gen(/*parallel=*/false,
914 /*slow=*/false)}},
915 {"aggregate", AggregateNodeOptions{
916 /*aggregates=*/{{"sum", nullptr}, {"any", nullptr}},
917 /*targets=*/{"i32", "bool"},
918 /*names=*/{"sum(i32)", "any(bool)"}}},
919 {"sink", SinkNodeOptions{&sink_gen}},
920 })
921 .AddToPlan(plan.get()));
922
923 ASSERT_THAT(
924 StartAndCollect(plan.get(), sink_gen),
925 Finishes(ResultWith(UnorderedElementsAreArray({
926 ExecBatchFromJSON({ValueDescr::Scalar(int64()), ValueDescr::Scalar(boolean())},
927 "[[22, true]]"),
928 }))));
929 }
930
TEST(ExecPlanExecution,AggregationPreservesOptions)931 TEST(ExecPlanExecution, AggregationPreservesOptions) {
932 // ARROW-13638: aggregation nodes initialize per-thread kernel state lazily
933 // and need to keep a copy/strong reference to function options
934 {
935 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
936 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
937
938 auto basic_data = MakeBasicBatches();
939
940 {
941 auto options = std::make_shared<TDigestOptions>(TDigestOptions::Defaults());
942 ASSERT_OK(Declaration::Sequence(
943 {
944 {"source", SourceNodeOptions{basic_data.schema,
945 basic_data.gen(/*parallel=*/false,
946 /*slow=*/false)}},
947 {"aggregate",
948 AggregateNodeOptions{/*aggregates=*/{{"tdigest", options.get()}},
949 /*targets=*/{"i32"},
950 /*names=*/{"tdigest(i32)"}}},
951 {"sink", SinkNodeOptions{&sink_gen}},
952 })
953 .AddToPlan(plan.get()));
954 }
955
956 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
957 Finishes(ResultWith(UnorderedElementsAreArray({
958 ExecBatchFromJSON({ValueDescr::Array(float64())}, "[[5.5]]"),
959 }))));
960 }
961 {
962 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
963 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
964
965 auto data = MakeGroupableBatches(/*multiplicity=*/100);
966
967 {
968 auto options = std::make_shared<CountOptions>(CountOptions::Defaults());
969 ASSERT_OK(
970 Declaration::Sequence(
971 {
972 {"source", SourceNodeOptions{data.schema, data.gen(/*parallel=*/false,
973 /*slow=*/false)}},
974 {"aggregate",
975 AggregateNodeOptions{/*aggregates=*/{{"hash_count", options.get()}},
976 /*targets=*/{"i32"},
977 /*names=*/{"count(i32)"},
978 /*keys=*/{"str"}}},
979 {"sink", SinkNodeOptions{&sink_gen}},
980 })
981 .AddToPlan(plan.get()));
982 }
983
984 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
985 Finishes(ResultWith(UnorderedElementsAreArray({
986 ExecBatchFromJSON({int64(), utf8()},
987 R"([[500, "alfa"], [200, "beta"], [200, "gama"]])"),
988 }))));
989 }
990 }
991
TEST(ExecPlanExecution,ScalarSourceScalarAggSink)992 TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
993 // ARROW-9056: scalar aggregation can be done over scalars, taking
994 // into account batch.length > 1 (e.g. a partition column)
995 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
996 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
997
998 BatchesWithSchema scalar_data;
999 scalar_data.batches = {
1000 ExecBatchFromJSON({ValueDescr::Scalar(int32()), ValueDescr::Scalar(boolean())},
1001 "[[5, false], [5, false], [5, false]]"),
1002 ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")};
1003 scalar_data.schema = schema({field("a", int32()), field("b", boolean())});
1004
1005 // index can't be tested as it's order-dependent
1006 // mode/quantile can't be tested as they're technically vector kernels
1007 ASSERT_OK(
1008 Declaration::Sequence(
1009 {
1010 {"source",
1011 SourceNodeOptions{scalar_data.schema, scalar_data.gen(/*parallel=*/false,
1012 /*slow=*/false)}},
1013 {"aggregate", AggregateNodeOptions{
1014 /*aggregates=*/{{"all", nullptr},
1015 {"any", nullptr},
1016 {"count", nullptr},
1017 {"mean", nullptr},
1018 {"product", nullptr},
1019 {"stddev", nullptr},
1020 {"sum", nullptr},
1021 {"tdigest", nullptr},
1022 {"variance", nullptr}},
1023 /*targets=*/{"b", "b", "a", "a", "a", "a", "a", "a", "a"},
1024 /*names=*/
1025 {"all(b)", "any(b)", "count(a)", "mean(a)", "product(a)",
1026 "stddev(a)", "sum(a)", "tdigest(a)", "variance(a)"}}},
1027 {"sink", SinkNodeOptions{&sink_gen}},
1028 })
1029 .AddToPlan(plan.get()));
1030
1031 ASSERT_THAT(
1032 StartAndCollect(plan.get(), sink_gen),
1033 Finishes(ResultWith(UnorderedElementsAreArray({
1034 ExecBatchFromJSON(
1035 {ValueDescr::Scalar(boolean()), ValueDescr::Scalar(boolean()),
1036 ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()),
1037 ValueDescr::Scalar(int64()), ValueDescr::Scalar(float64()),
1038 ValueDescr::Scalar(int64()), ValueDescr::Array(float64()),
1039 ValueDescr::Scalar(float64())},
1040 R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"),
1041 }))));
1042 }
1043
TEST(ExecPlanExecution,ScalarSourceGroupedSum)1044 TEST(ExecPlanExecution, ScalarSourceGroupedSum) {
1045 // ARROW-14630: ensure grouped aggregation with a scalar key/array input doesn't error
1046 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
1047 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
1048
1049 BatchesWithSchema scalar_data;
1050 scalar_data.batches = {
1051 ExecBatchFromJSON({int32(), ValueDescr::Scalar(boolean())},
1052 "[[5, false], [6, false], [7, false]]"),
1053 ExecBatchFromJSON({int32(), ValueDescr::Scalar(boolean())},
1054 "[[1, true], [2, true], [3, true]]"),
1055 };
1056 scalar_data.schema = schema({field("a", int32()), field("b", boolean())});
1057
1058 SortOptions options({SortKey("b", SortOrder::Descending)});
1059 ASSERT_OK(Declaration::Sequence(
1060 {
1061 {"source", SourceNodeOptions{scalar_data.schema,
1062 scalar_data.gen(/*parallel=*/false,
1063 /*slow=*/false)}},
1064 {"aggregate",
1065 AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
1066 /*targets=*/{"a"}, /*names=*/{"hash_sum(a)"},
1067 /*keys=*/{"b"}}},
1068 {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}},
1069 })
1070 .AddToPlan(plan.get()));
1071
1072 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
1073 Finishes(ResultWith(UnorderedElementsAreArray({
1074 ExecBatchFromJSON({int64(), boolean()}, R"([[6, true], [18, false]])"),
1075 }))));
1076 }
1077
TEST(ExecPlanExecution,SelfInnerHashJoinSink)1078 TEST(ExecPlanExecution, SelfInnerHashJoinSink) {
1079 for (bool parallel : {false, true}) {
1080 SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
1081
1082 auto input = MakeGroupableBatches();
1083
1084 auto exec_ctx = arrow::internal::make_unique<ExecContext>(
1085 default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);
1086
1087 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
1088 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
1089
1090 ExecNode* left_source;
1091 ExecNode* right_source;
1092 for (auto source : {&left_source, &right_source}) {
1093 ASSERT_OK_AND_ASSIGN(
1094 *source, MakeExecNode("source", plan.get(), {},
1095 SourceNodeOptions{input.schema,
1096 input.gen(parallel, /*slow=*/false)}));
1097 }
1098 ASSERT_OK_AND_ASSIGN(
1099 auto left_filter,
1100 MakeExecNode("filter", plan.get(), {left_source},
1101 FilterNodeOptions{greater_equal(field_ref("i32"), literal(-1))}));
1102 ASSERT_OK_AND_ASSIGN(
1103 auto right_filter,
1104 MakeExecNode("filter", plan.get(), {right_source},
1105 FilterNodeOptions{less_equal(field_ref("i32"), literal(2))}));
1106
1107 // left side: [3, "alfa"], [3, "alfa"], [12, "alfa"], [3, "beta"], [7, "beta"],
1108 // [-1, "gama"], [5, "gama"]
1109 // right side: [-2, "alfa"], [-8, "alfa"], [-1, "gama"]
1110
1111 HashJoinNodeOptions join_opts{JoinType::INNER,
1112 /*left_keys=*/{"str"},
1113 /*right_keys=*/{"str"}, "l_", "r_"};
1114
1115 ASSERT_OK_AND_ASSIGN(
1116 auto hashjoin,
1117 MakeExecNode("hashjoin", plan.get(), {left_filter, right_filter}, join_opts));
1118
1119 ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {hashjoin},
1120 SinkNodeOptions{&sink_gen}));
1121
1122 ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));
1123
1124 std::vector<ExecBatch> expected = {
1125 ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([
1126 [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
1127 [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
1128 [12, "alfa", -2, "alfa"], [12, "alfa", -8, "alfa"],
1129 [-1, "gama", -1, "gama"], [5, "gama", -1, "gama"]])")};
1130
1131 AssertExecBatchesEqual(hashjoin->output_schema(), result, expected);
1132 }
1133 }
1134
TEST(ExecPlanExecution,SelfOuterHashJoinSink)1135 TEST(ExecPlanExecution, SelfOuterHashJoinSink) {
1136 for (bool parallel : {false, true}) {
1137 SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
1138
1139 auto input = MakeGroupableBatches();
1140
1141 auto exec_ctx = arrow::internal::make_unique<ExecContext>(
1142 default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);
1143
1144 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
1145 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
1146
1147 ExecNode* left_source;
1148 ExecNode* right_source;
1149 for (auto source : {&left_source, &right_source}) {
1150 ASSERT_OK_AND_ASSIGN(
1151 *source, MakeExecNode("source", plan.get(), {},
1152 SourceNodeOptions{input.schema,
1153 input.gen(parallel, /*slow=*/false)}));
1154 }
1155 ASSERT_OK_AND_ASSIGN(
1156 auto left_filter,
1157 MakeExecNode("filter", plan.get(), {left_source},
1158 FilterNodeOptions{greater_equal(field_ref("i32"), literal(-1))}));
1159 ASSERT_OK_AND_ASSIGN(
1160 auto right_filter,
1161 MakeExecNode("filter", plan.get(), {right_source},
1162 FilterNodeOptions{less_equal(field_ref("i32"), literal(2))}));
1163
1164 // left side: [3, "alfa"], [3, "alfa"], [12, "alfa"], [3, "beta"], [7, "beta"],
1165 // [-1, "gama"], [5, "gama"]
1166 // right side: [-2, "alfa"], [-8, "alfa"], [-1, "gama"]
1167
1168 HashJoinNodeOptions join_opts{JoinType::FULL_OUTER,
1169 /*left_keys=*/{"str"},
1170 /*right_keys=*/{"str"}, "l_", "r_"};
1171
1172 ASSERT_OK_AND_ASSIGN(
1173 auto hashjoin,
1174 MakeExecNode("hashjoin", plan.get(), {left_filter, right_filter}, join_opts));
1175
1176 ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {hashjoin},
1177 SinkNodeOptions{&sink_gen}));
1178
1179 ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));
1180
1181 std::vector<ExecBatch> expected = {
1182 ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([
1183 [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
1184 [3, "alfa", -2, "alfa"], [3, "alfa", -8, "alfa"],
1185 [12, "alfa", -2, "alfa"], [12, "alfa", -8, "alfa"],
1186 [3, "beta", null, null], [7, "beta", null, null],
1187 [-1, "gama", -1, "gama"], [5, "gama", -1, "gama"]])")};
1188
1189 AssertExecBatchesEqual(hashjoin->output_schema(), result, expected);
1190 }
1191 }
1192
TEST(ExecPlan,RecordBatchReaderSourceSink)1193 TEST(ExecPlan, RecordBatchReaderSourceSink) {
1194 ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
1195 AsyncGenerator<util::optional<ExecBatch>> sink_gen;
1196
1197 // set up a RecordBatchReader:
1198 auto input = MakeBasicBatches();
1199
1200 RecordBatchVector batches;
1201 for (const ExecBatch& exec_batch : input.batches) {
1202 ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema));
1203 batches.push_back(batch);
1204 }
1205
1206 ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
1207 std::shared_ptr<RecordBatchReader> reader = std::make_shared<TableBatchReader>(*table);
1208
1209 // Map the RecordBatchReader to a SourceNode
1210 ASSERT_OK_AND_ASSIGN(
1211 auto batch_gen,
1212 MakeReaderGenerator(std::move(reader), arrow::io::internal::GetIOThreadPool()));
1213
1214 ASSERT_OK(
1215 Declaration::Sequence({
1216 {"source", SourceNodeOptions{table->schema(), batch_gen}},
1217 {"sink", SinkNodeOptions{&sink_gen}},
1218 })
1219 .AddToPlan(plan.get()));
1220
1221 ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
1222 Finishes(ResultWith(UnorderedElementsAreArray(input.batches))));
1223 }
1224
1225 } // namespace compute
1226 } // namespace arrow
1227