1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <algorithm>
19 #include <atomic>
20 #include <chrono>
21 #include <condition_variable>
22 #include <memory>
23 #include <mutex>
24 #include <ostream>
25 #include <thread>
26 #include <unordered_set>
27 #include <vector>
28
29 #include "arrow/testing/gtest_util.h"
30 #include "arrow/util/async_generator.h"
31 #include "arrow/util/iterator.h"
32 #include "arrow/util/test_common.h"
33 #include "arrow/util/vector.h"
34
35 namespace arrow {
36
37 template <typename T>
38 class TracingIterator {
39 public:
TracingIterator(Iterator<T> it)40 explicit TracingIterator(Iterator<T> it) : it_(std::move(it)), state_(new State) {}
41
Next()42 Result<T> Next() {
43 auto lock = state_->Lock();
44 state_->thread_ids_.insert(std::this_thread::get_id());
45
46 RETURN_NOT_OK(state_->GetNextStatus());
47
48 ARROW_ASSIGN_OR_RAISE(auto out, it_.Next());
49 state_->values_.push_back(out);
50
51 state_->cv_.notify_one();
52 return out;
53 }
54
55 class State {
56 public:
values()57 const std::vector<T>& values() { return values_; }
58
thread_ids()59 const std::unordered_set<std::thread::id>& thread_ids() { return thread_ids_; }
60
InsertFailure(Status st)61 void InsertFailure(Status st) {
62 auto lock = Lock();
63 next_status_ = std::move(st);
64 }
65
66 // Wait until the iterator has emitted at least `size` values
WaitForValues(int size)67 void WaitForValues(int size) {
68 auto lock = Lock();
69 cv_.wait(lock, [&]() { return values_.size() >= static_cast<size_t>(size); });
70 }
71
AssertValuesEqual(const std::vector<T> & expected)72 void AssertValuesEqual(const std::vector<T>& expected) {
73 auto lock = Lock();
74 ASSERT_EQ(values_, expected);
75 }
76
AssertValuesStartwith(const std::vector<T> & expected)77 void AssertValuesStartwith(const std::vector<T>& expected) {
78 auto lock = Lock();
79 ASSERT_TRUE(std::equal(expected.begin(), expected.end(), values_.begin()));
80 }
81
Lock()82 std::unique_lock<std::mutex> Lock() { return std::unique_lock<std::mutex>(mutex_); }
83
84 private:
85 friend TracingIterator;
86
GetNextStatus()87 Status GetNextStatus() {
88 if (next_status_.ok()) {
89 return Status::OK();
90 }
91
92 Status st = std::move(next_status_);
93 next_status_ = Status::OK();
94 return st;
95 }
96
97 Status next_status_;
98 std::vector<T> values_;
99 std::unordered_set<std::thread::id> thread_ids_;
100
101 std::mutex mutex_;
102 std::condition_variable cv_;
103 };
104
state() const105 const std::shared_ptr<State>& state() const { return state_; }
106
107 private:
108 Iterator<T> it_;
109
110 std::shared_ptr<State> state_;
111 };
112
113 template <typename T>
EmptyIt()114 inline Iterator<T> EmptyIt() {
115 return MakeEmptyIterator<T>();
116 }
117
118 // Non-templated version of VectorIt<T> to allow better type deduction
VectorIt(std::vector<TestInt> v)119 inline Iterator<TestInt> VectorIt(std::vector<TestInt> v) {
120 return MakeVectorIterator<TestInt>(std::move(v));
121 }
122
123 template <typename Fn, typename T>
FilterIt(Iterator<T> it,Fn && fn)124 inline Iterator<T> FilterIt(Iterator<T> it, Fn&& fn) {
125 return MakeFilterIterator(std::forward<Fn>(fn), std::move(it));
126 }
127
128 template <typename T>
FlattenIt(Iterator<Iterator<T>> its)129 inline Iterator<T> FlattenIt(Iterator<Iterator<T>> its) {
130 return MakeFlattenIterator(std::move(its));
131 }
132
133 template <typename T>
AssertIteratorMatch(std::vector<T> expected,Iterator<T> actual)134 void AssertIteratorMatch(std::vector<T> expected, Iterator<T> actual) {
135 EXPECT_EQ(expected, IteratorToVector(std::move(actual)));
136 }
137
138 template <typename T>
AssertIteratorNoMatch(std::vector<T> expected,Iterator<T> actual)139 void AssertIteratorNoMatch(std::vector<T> expected, Iterator<T> actual) {
140 EXPECT_NE(expected, IteratorToVector(std::move(actual)));
141 }
142
143 template <typename T>
AssertIteratorNext(T expected,Iterator<T> & it)144 void AssertIteratorNext(T expected, Iterator<T>& it) {
145 ASSERT_OK_AND_ASSIGN(T actual, it.Next());
146 ASSERT_EQ(expected, actual);
147 }
148
149 // --------------------------------------------------------------------
150 // Synchronous iterator tests
151
TEST(TestEmptyIterator,Basic)152 TEST(TestEmptyIterator, Basic) { AssertIteratorMatch({}, EmptyIt<TestInt>()); }
153
TEST(TestVectorIterator,Basic)154 TEST(TestVectorIterator, Basic) {
155 AssertIteratorMatch({}, VectorIt({}));
156 AssertIteratorMatch({1, 2, 3}, VectorIt({1, 2, 3}));
157
158 AssertIteratorNoMatch({1}, VectorIt({}));
159 AssertIteratorNoMatch({}, VectorIt({1, 2, 3}));
160 AssertIteratorNoMatch({1, 2, 2}, VectorIt({1, 2, 3}));
161 AssertIteratorNoMatch({1, 2, 3, 1}, VectorIt({1, 2, 3}));
162
163 // int does not have specialized IterationTraits
164 std::vector<int> elements = {0, 1, 2, 3, 4, 5};
165 std::vector<int*> expected;
166 for (int& element : elements) {
167 expected.push_back(&element);
168 }
169 AssertIteratorMatch(expected, MakeVectorPointingIterator(std::move(elements)));
170 }
171
TEST(TestVectorIterator,RangeForLoop)172 TEST(TestVectorIterator, RangeForLoop) {
173 std::vector<TestInt> ints = {1, 2, 3, 4};
174
175 auto ints_it = ints.begin();
176 for (auto maybe_i : VectorIt(ints)) {
177 ASSERT_OK_AND_ASSIGN(TestInt i, maybe_i);
178 ASSERT_EQ(i, *ints_it++);
179 }
180 ASSERT_EQ(ints_it, ints.end()) << *ints_it << "@" << (ints_it - ints.begin());
181
182 std::vector<std::unique_ptr<TestInt>> intptrs;
183 for (TestInt i : ints) {
184 intptrs.emplace_back(new TestInt(i));
185 }
186
187 // also works with move only types
188 ints_it = ints.begin();
189 for (auto maybe_i_ptr : MakeVectorIterator(std::move(intptrs))) {
190 ASSERT_OK_AND_ASSIGN(std::unique_ptr<TestInt> i_ptr, maybe_i_ptr);
191 ASSERT_EQ(*i_ptr, *ints_it++);
192 }
193 ASSERT_EQ(ints_it, ints.end());
194 }
195
MakeFirstN(int n)196 Transformer<TestInt, TestStr> MakeFirstN(int n) {
197 int remaining = n;
198 return [remaining](TestInt next) mutable -> Result<TransformFlow<TestStr>> {
199 if (remaining > 0) {
200 remaining--;
201 return TransformYield(TestStr(next));
202 }
203 return TransformFinish();
204 };
205 }
206
207 template <typename T>
MakeFirstNGeneric(int n)208 Transformer<T, T> MakeFirstNGeneric(int n) {
209 int remaining = n;
210 return [remaining](T next) mutable -> Result<TransformFlow<T>> {
211 if (remaining > 0) {
212 remaining--;
213 return TransformYield(next);
214 }
215 return TransformFinish();
216 };
217 }
218
TEST(TestIteratorTransform,Truncating)219 TEST(TestIteratorTransform, Truncating) {
220 auto original = VectorIt({1, 2, 3});
221 auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN(2));
222 AssertIteratorMatch({"1", "2"}, std::move(truncated));
223 }
224
TEST(TestIteratorTransform,TestPointer)225 TEST(TestIteratorTransform, TestPointer) {
226 auto original = VectorIt<std::shared_ptr<int>>(
227 {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
228 auto truncated = MakeTransformedIterator(std::move(original),
229 MakeFirstNGeneric<std::shared_ptr<int>>(2));
230 ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
231 ASSERT_EQ(2, result.size());
232 }
233
TEST(TestIteratorTransform,TruncatingShort)234 TEST(TestIteratorTransform, TruncatingShort) {
235 // Tests the failsafe case where we never call Finish
236 auto original = VectorIt({1});
237 auto truncated =
238 MakeTransformedIterator<TestInt, TestStr>(std::move(original), MakeFirstN(2));
239 AssertIteratorMatch({"1"}, std::move(truncated));
240 }
241
TEST(TestIteratorTransform,SkipSome)242 TEST(TestIteratorTransform, SkipSome) {
243 // Exercises TransformSkip
244 auto original = VectorIt({1, 2, 3});
245 auto filter = MakeFilter([](TestInt& t) { return t.value != 2; });
246 auto filtered = MakeTransformedIterator(std::move(original), filter);
247 AssertIteratorMatch({"1", "3"}, std::move(filtered));
248 }
249
TEST(TestIteratorTransform,SkipAll)250 TEST(TestIteratorTransform, SkipAll) {
251 // Exercises TransformSkip
252 auto original = VectorIt({1, 2, 3});
253 auto filter = MakeFilter([](TestInt& t) { return false; });
254 auto filtered = MakeTransformedIterator(std::move(original), filter);
255 AssertIteratorMatch({}, std::move(filtered));
256 }
257
MakeAbortOnSecond()258 Transformer<TestInt, TestStr> MakeAbortOnSecond() {
259 int counter = 0;
260 return [counter](TestInt next) mutable -> Result<TransformFlow<TestStr>> {
261 if (counter++ == 1) {
262 return Status::Invalid("X");
263 }
264 return TransformYield(TestStr(next));
265 };
266 }
267
TEST(TestIteratorTransform,Abort)268 TEST(TestIteratorTransform, Abort) {
269 auto original = VectorIt({1, 2, 3});
270 auto transformed = MakeTransformedIterator(std::move(original), MakeAbortOnSecond());
271 ASSERT_OK(transformed.Next());
272 ASSERT_RAISES(Invalid, transformed.Next());
273 ASSERT_OK_AND_ASSIGN(auto third, transformed.Next());
274 ASSERT_TRUE(IsIterationEnd(third));
275 }
276
277 template <typename T>
MakeRepeatN(int repeat_count)278 Transformer<T, T> MakeRepeatN(int repeat_count) {
279 int current_repeat = 0;
280 return [repeat_count, current_repeat](T next) mutable -> Result<TransformFlow<T>> {
281 current_repeat++;
282 bool ready_for_next = false;
283 if (current_repeat == repeat_count) {
284 current_repeat = 0;
285 ready_for_next = true;
286 }
287 return TransformYield(next, ready_for_next);
288 };
289 }
290
TEST(TestIteratorTransform,Repeating)291 TEST(TestIteratorTransform, Repeating) {
292 auto original = VectorIt({1, 2, 3});
293 auto repeated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
294 MakeRepeatN<TestInt>(2));
295 AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated));
296 }
297
TEST(TestFunctionIterator,RangeForLoop)298 TEST(TestFunctionIterator, RangeForLoop) {
299 int i = 0;
300 auto fails_at_3 = MakeFunctionIterator([&]() -> Result<TestInt> {
301 if (i >= 3) {
302 return Status::IndexError("fails at 3");
303 }
304 return i++;
305 });
306
307 int expected_i = 0;
308 for (auto maybe_i : fails_at_3) {
309 if (expected_i < 3) {
310 ASSERT_OK(maybe_i.status());
311 ASSERT_EQ(*maybe_i, expected_i);
312 } else if (expected_i == 3) {
313 ASSERT_RAISES(IndexError, maybe_i.status());
314 }
315 ASSERT_LE(expected_i, 3) << "iteration stops after an error is encountered";
316 ++expected_i;
317 }
318 }
319
TEST(FilterIterator,Basic)320 TEST(FilterIterator, Basic) {
321 AssertIteratorMatch({1, 2, 3, 4}, FilterIt(VectorIt({1, 2, 3, 4}), [](TestInt i) {
322 return FilterIterator::Accept(std::move(i));
323 }));
324
325 AssertIteratorMatch({}, FilterIt(VectorIt({1, 2, 3, 4}), [](TestInt i) {
326 return FilterIterator::Reject<TestInt>();
327 }));
328
329 AssertIteratorMatch({2, 4}, FilterIt(VectorIt({1, 2, 3, 4}), [](TestInt i) {
330 return i.value % 2 == 0 ? FilterIterator::Accept(std::move(i))
331 : FilterIterator::Reject<TestInt>();
332 }));
333 }
334
TEST(FlattenVectorIterator,Basic)335 TEST(FlattenVectorIterator, Basic) {
336 // Flatten expects to consume Iterator<Iterator<T>>
337 AssertIteratorMatch({}, FlattenIt(EmptyIt<Iterator<TestInt>>()));
338
339 std::vector<Iterator<TestInt>> ok;
340 ok.push_back(VectorIt({1}));
341 ok.push_back(VectorIt({2}));
342 ok.push_back(VectorIt({3}));
343 AssertIteratorMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(ok))));
344
345 std::vector<Iterator<TestInt>> not_enough;
346 not_enough.push_back(VectorIt({1}));
347 not_enough.push_back(VectorIt({2}));
348 AssertIteratorNoMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(not_enough))));
349
350 std::vector<Iterator<TestInt>> too_much;
351 too_much.push_back(VectorIt({1}));
352 too_much.push_back(VectorIt({2}));
353 too_much.push_back(VectorIt({3}));
354 too_much.push_back(VectorIt({2}));
355 AssertIteratorNoMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(too_much))));
356 }
357
Join(TestInt a,TestInt b)358 Iterator<TestInt> Join(TestInt a, TestInt b) {
359 std::vector<Iterator<TestInt>> joined{2};
360 joined[0] = VectorIt({a});
361 joined[1] = VectorIt({b});
362
363 return FlattenIt(VectorIt(std::move(joined)));
364 }
365
Join(TestInt a,Iterator<TestInt> b)366 Iterator<TestInt> Join(TestInt a, Iterator<TestInt> b) {
367 std::vector<Iterator<TestInt>> joined{2};
368 joined[0] = VectorIt(std::vector<TestInt>{a});
369 joined[1] = std::move(b);
370
371 return FlattenIt(VectorIt(std::move(joined)));
372 }
373
TEST(FlattenVectorIterator,Pyramid)374 TEST(FlattenVectorIterator, Pyramid) {
375 auto it = Join(1, Join(2, Join(2, Join(3, Join(3, 3)))));
376 AssertIteratorMatch({1, 2, 2, 3, 3, 3}, std::move(it));
377 }
378
TEST(ReadaheadIterator,Empty)379 TEST(ReadaheadIterator, Empty) {
380 ASSERT_OK_AND_ASSIGN(auto it, MakeReadaheadIterator(VectorIt({}), 2));
381 AssertIteratorMatch({}, std::move(it));
382 }
383
TEST(ReadaheadIterator,Basic)384 TEST(ReadaheadIterator, Basic) {
385 ASSERT_OK_AND_ASSIGN(auto it, MakeReadaheadIterator(VectorIt({1, 2, 3, 4, 5}), 2));
386 AssertIteratorMatch({1, 2, 3, 4, 5}, std::move(it));
387 }
388
TEST(ReadaheadIterator,NotExhausted)389 TEST(ReadaheadIterator, NotExhausted) {
390 ASSERT_OK_AND_ASSIGN(auto it, MakeReadaheadIterator(VectorIt({1, 2, 3, 4, 5}), 2));
391 AssertIteratorNext({1}, it);
392 AssertIteratorNext({2}, it);
393 }
394
TEST(ReadaheadIterator,Trace)395 TEST(ReadaheadIterator, Trace) {
396 TracingIterator<TestInt> tracing_it(VectorIt({1, 2, 3, 4, 5, 6, 7, 8}));
397 auto tracing = tracing_it.state();
398 ASSERT_EQ(tracing->values().size(), 0);
399
400 ASSERT_OK_AND_ASSIGN(
401 auto it, MakeReadaheadIterator(Iterator<TestInt>(std::move(tracing_it)), 2));
402 SleepABit(); // Background iterator won't start pumping until first request comes in
403 ASSERT_EQ(tracing->values().size(), 0);
404
405 AssertIteratorNext({1}, it); // Once we ask for one value we should get that one value
406 // as well as 2 read ahead
407
408 tracing->WaitForValues(3);
409 tracing->AssertValuesEqual({1, 2, 3});
410
411 SleepABit(); // No further values should be fetched
412 tracing->AssertValuesEqual({1, 2, 3});
413
414 AssertIteratorNext({2}, it);
415 AssertIteratorNext({3}, it);
416 AssertIteratorNext({4}, it);
417 tracing->WaitForValues(6);
418 SleepABit();
419 tracing->AssertValuesEqual({1, 2, 3, 4, 5, 6});
420
421 AssertIteratorNext({5}, it);
422 AssertIteratorNext({6}, it);
423 AssertIteratorNext({7}, it);
424 tracing->WaitForValues(9);
425 SleepABit();
426 tracing->AssertValuesEqual({1, 2, 3, 4, 5, 6, 7, 8, {}});
427
428 AssertIteratorNext({8}, it);
429 AssertIteratorExhausted(it);
430 AssertIteratorExhausted(it); // Again
431 tracing->WaitForValues(9);
432 SleepABit();
433 tracing->AssertValuesStartwith({1, 2, 3, 4, 5, 6, 7, 8, {}});
434 // A couple more EOF values may have been emitted
435 const auto& values = tracing->values();
436 ASSERT_LE(values.size(), 11);
437 for (size_t i = 9; i < values.size(); ++i) {
438 ASSERT_EQ(values[i], TestInt());
439 }
440
441 // Values were all emitted from the same thread, and it's not this thread
442 const auto& thread_ids = tracing->thread_ids();
443 ASSERT_EQ(thread_ids.size(), 1);
444 ASSERT_NE(*thread_ids.begin(), std::this_thread::get_id());
445 }
446
TEST(ReadaheadIterator,NextError)447 TEST(ReadaheadIterator, NextError) {
448 TracingIterator<TestInt> tracing_it((VectorIt({1, 2, 3})));
449 auto tracing = tracing_it.state();
450 ASSERT_EQ(tracing->values().size(), 0);
451
452 tracing->InsertFailure(Status::IOError("xxx"));
453
454 ASSERT_OK_AND_ASSIGN(
455 auto it, MakeReadaheadIterator(Iterator<TestInt>(std::move(tracing_it)), 2));
456
457 ASSERT_RAISES(IOError, it.Next().status());
458
459 AssertIteratorExhausted(it);
460 SleepABit();
461 tracing->AssertValuesEqual({});
462 AssertIteratorExhausted(it);
463 }
464
465 } // namespace arrow
466