1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <chrono>
21 #include <condition_variable>
22 #include <memory>
23 #include <mutex>
24 #include <ostream>
25 #include <thread>
26 #include <unordered_set>
27 #include <vector>
28 
29 #include "arrow/testing/gtest_util.h"
30 #include "arrow/util/async_generator.h"
31 #include "arrow/util/iterator.h"
32 #include "arrow/util/test_common.h"
33 #include "arrow/util/vector.h"
34 
35 namespace arrow {
36 
37 template <typename T>
38 class TracingIterator {
39  public:
TracingIterator(Iterator<T> it)40   explicit TracingIterator(Iterator<T> it) : it_(std::move(it)), state_(new State) {}
41 
Next()42   Result<T> Next() {
43     auto lock = state_->Lock();
44     state_->thread_ids_.insert(std::this_thread::get_id());
45 
46     RETURN_NOT_OK(state_->GetNextStatus());
47 
48     ARROW_ASSIGN_OR_RAISE(auto out, it_.Next());
49     state_->values_.push_back(out);
50 
51     state_->cv_.notify_one();
52     return out;
53   }
54 
55   class State {
56    public:
values()57     const std::vector<T>& values() { return values_; }
58 
thread_ids()59     const std::unordered_set<std::thread::id>& thread_ids() { return thread_ids_; }
60 
InsertFailure(Status st)61     void InsertFailure(Status st) {
62       auto lock = Lock();
63       next_status_ = std::move(st);
64     }
65 
66     // Wait until the iterator has emitted at least `size` values
WaitForValues(int size)67     void WaitForValues(int size) {
68       auto lock = Lock();
69       cv_.wait(lock, [&]() { return values_.size() >= static_cast<size_t>(size); });
70     }
71 
AssertValuesEqual(const std::vector<T> & expected)72     void AssertValuesEqual(const std::vector<T>& expected) {
73       auto lock = Lock();
74       ASSERT_EQ(values_, expected);
75     }
76 
AssertValuesStartwith(const std::vector<T> & expected)77     void AssertValuesStartwith(const std::vector<T>& expected) {
78       auto lock = Lock();
79       ASSERT_TRUE(std::equal(expected.begin(), expected.end(), values_.begin()));
80     }
81 
Lock()82     std::unique_lock<std::mutex> Lock() { return std::unique_lock<std::mutex>(mutex_); }
83 
84    private:
85     friend TracingIterator;
86 
GetNextStatus()87     Status GetNextStatus() {
88       if (next_status_.ok()) {
89         return Status::OK();
90       }
91 
92       Status st = std::move(next_status_);
93       next_status_ = Status::OK();
94       return st;
95     }
96 
97     Status next_status_;
98     std::vector<T> values_;
99     std::unordered_set<std::thread::id> thread_ids_;
100 
101     std::mutex mutex_;
102     std::condition_variable cv_;
103   };
104 
state() const105   const std::shared_ptr<State>& state() const { return state_; }
106 
107  private:
108   Iterator<T> it_;
109 
110   std::shared_ptr<State> state_;
111 };
112 
113 template <typename T>
EmptyIt()114 inline Iterator<T> EmptyIt() {
115   return MakeEmptyIterator<T>();
116 }
117 
118 // Non-templated version of VectorIt<T> to allow better type deduction
VectorIt(std::vector<TestInt> v)119 inline Iterator<TestInt> VectorIt(std::vector<TestInt> v) {
120   return MakeVectorIterator<TestInt>(std::move(v));
121 }
122 
123 template <typename Fn, typename T>
FilterIt(Iterator<T> it,Fn && fn)124 inline Iterator<T> FilterIt(Iterator<T> it, Fn&& fn) {
125   return MakeFilterIterator(std::forward<Fn>(fn), std::move(it));
126 }
127 
128 template <typename T>
FlattenIt(Iterator<Iterator<T>> its)129 inline Iterator<T> FlattenIt(Iterator<Iterator<T>> its) {
130   return MakeFlattenIterator(std::move(its));
131 }
132 
133 template <typename T>
AssertIteratorMatch(std::vector<T> expected,Iterator<T> actual)134 void AssertIteratorMatch(std::vector<T> expected, Iterator<T> actual) {
135   EXPECT_EQ(expected, IteratorToVector(std::move(actual)));
136 }
137 
138 template <typename T>
AssertIteratorNoMatch(std::vector<T> expected,Iterator<T> actual)139 void AssertIteratorNoMatch(std::vector<T> expected, Iterator<T> actual) {
140   EXPECT_NE(expected, IteratorToVector(std::move(actual)));
141 }
142 
143 template <typename T>
AssertIteratorNext(T expected,Iterator<T> & it)144 void AssertIteratorNext(T expected, Iterator<T>& it) {
145   ASSERT_OK_AND_ASSIGN(T actual, it.Next());
146   ASSERT_EQ(expected, actual);
147 }
148 
149 // --------------------------------------------------------------------
150 // Synchronous iterator tests
151 
TEST(TestEmptyIterator,Basic)152 TEST(TestEmptyIterator, Basic) { AssertIteratorMatch({}, EmptyIt<TestInt>()); }
153 
TEST(TestVectorIterator,Basic)154 TEST(TestVectorIterator, Basic) {
155   AssertIteratorMatch({}, VectorIt({}));
156   AssertIteratorMatch({1, 2, 3}, VectorIt({1, 2, 3}));
157 
158   AssertIteratorNoMatch({1}, VectorIt({}));
159   AssertIteratorNoMatch({}, VectorIt({1, 2, 3}));
160   AssertIteratorNoMatch({1, 2, 2}, VectorIt({1, 2, 3}));
161   AssertIteratorNoMatch({1, 2, 3, 1}, VectorIt({1, 2, 3}));
162 
163   // int does not have specialized IterationTraits
164   std::vector<int> elements = {0, 1, 2, 3, 4, 5};
165   std::vector<int*> expected;
166   for (int& element : elements) {
167     expected.push_back(&element);
168   }
169   AssertIteratorMatch(expected, MakeVectorPointingIterator(std::move(elements)));
170 }
171 
TEST(TestVectorIterator,RangeForLoop)172 TEST(TestVectorIterator, RangeForLoop) {
173   std::vector<TestInt> ints = {1, 2, 3, 4};
174 
175   auto ints_it = ints.begin();
176   for (auto maybe_i : VectorIt(ints)) {
177     ASSERT_OK_AND_ASSIGN(TestInt i, maybe_i);
178     ASSERT_EQ(i, *ints_it++);
179   }
180   ASSERT_EQ(ints_it, ints.end()) << *ints_it << "@" << (ints_it - ints.begin());
181 
182   std::vector<std::unique_ptr<TestInt>> intptrs;
183   for (TestInt i : ints) {
184     intptrs.emplace_back(new TestInt(i));
185   }
186 
187   // also works with move only types
188   ints_it = ints.begin();
189   for (auto maybe_i_ptr : MakeVectorIterator(std::move(intptrs))) {
190     ASSERT_OK_AND_ASSIGN(std::unique_ptr<TestInt> i_ptr, maybe_i_ptr);
191     ASSERT_EQ(*i_ptr, *ints_it++);
192   }
193   ASSERT_EQ(ints_it, ints.end());
194 }
195 
MakeFirstN(int n)196 Transformer<TestInt, TestStr> MakeFirstN(int n) {
197   int remaining = n;
198   return [remaining](TestInt next) mutable -> Result<TransformFlow<TestStr>> {
199     if (remaining > 0) {
200       remaining--;
201       return TransformYield(TestStr(next));
202     }
203     return TransformFinish();
204   };
205 }
206 
207 template <typename T>
MakeFirstNGeneric(int n)208 Transformer<T, T> MakeFirstNGeneric(int n) {
209   int remaining = n;
210   return [remaining](T next) mutable -> Result<TransformFlow<T>> {
211     if (remaining > 0) {
212       remaining--;
213       return TransformYield(next);
214     }
215     return TransformFinish();
216   };
217 }
218 
TEST(TestIteratorTransform,Truncating)219 TEST(TestIteratorTransform, Truncating) {
220   auto original = VectorIt({1, 2, 3});
221   auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN(2));
222   AssertIteratorMatch({"1", "2"}, std::move(truncated));
223 }
224 
TEST(TestIteratorTransform,TestPointer)225 TEST(TestIteratorTransform, TestPointer) {
226   auto original = VectorIt<std::shared_ptr<int>>(
227       {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
228   auto truncated = MakeTransformedIterator(std::move(original),
229                                            MakeFirstNGeneric<std::shared_ptr<int>>(2));
230   ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
231   ASSERT_EQ(2, result.size());
232 }
233 
TEST(TestIteratorTransform,TruncatingShort)234 TEST(TestIteratorTransform, TruncatingShort) {
235   // Tests the failsafe case where we never call Finish
236   auto original = VectorIt({1});
237   auto truncated =
238       MakeTransformedIterator<TestInt, TestStr>(std::move(original), MakeFirstN(2));
239   AssertIteratorMatch({"1"}, std::move(truncated));
240 }
241 
TEST(TestIteratorTransform,SkipSome)242 TEST(TestIteratorTransform, SkipSome) {
243   // Exercises TransformSkip
244   auto original = VectorIt({1, 2, 3});
245   auto filter = MakeFilter([](TestInt& t) { return t.value != 2; });
246   auto filtered = MakeTransformedIterator(std::move(original), filter);
247   AssertIteratorMatch({"1", "3"}, std::move(filtered));
248 }
249 
TEST(TestIteratorTransform,SkipAll)250 TEST(TestIteratorTransform, SkipAll) {
251   // Exercises TransformSkip
252   auto original = VectorIt({1, 2, 3});
253   auto filter = MakeFilter([](TestInt& t) { return false; });
254   auto filtered = MakeTransformedIterator(std::move(original), filter);
255   AssertIteratorMatch({}, std::move(filtered));
256 }
257 
MakeAbortOnSecond()258 Transformer<TestInt, TestStr> MakeAbortOnSecond() {
259   int counter = 0;
260   return [counter](TestInt next) mutable -> Result<TransformFlow<TestStr>> {
261     if (counter++ == 1) {
262       return Status::Invalid("X");
263     }
264     return TransformYield(TestStr(next));
265   };
266 }
267 
TEST(TestIteratorTransform,Abort)268 TEST(TestIteratorTransform, Abort) {
269   auto original = VectorIt({1, 2, 3});
270   auto transformed = MakeTransformedIterator(std::move(original), MakeAbortOnSecond());
271   ASSERT_OK(transformed.Next());
272   ASSERT_RAISES(Invalid, transformed.Next());
273   ASSERT_OK_AND_ASSIGN(auto third, transformed.Next());
274   ASSERT_TRUE(IsIterationEnd(third));
275 }
276 
277 template <typename T>
MakeRepeatN(int repeat_count)278 Transformer<T, T> MakeRepeatN(int repeat_count) {
279   int current_repeat = 0;
280   return [repeat_count, current_repeat](T next) mutable -> Result<TransformFlow<T>> {
281     current_repeat++;
282     bool ready_for_next = false;
283     if (current_repeat == repeat_count) {
284       current_repeat = 0;
285       ready_for_next = true;
286     }
287     return TransformYield(next, ready_for_next);
288   };
289 }
290 
TEST(TestIteratorTransform,Repeating)291 TEST(TestIteratorTransform, Repeating) {
292   auto original = VectorIt({1, 2, 3});
293   auto repeated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
294                                                             MakeRepeatN<TestInt>(2));
295   AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated));
296 }
297 
TEST(TestFunctionIterator,RangeForLoop)298 TEST(TestFunctionIterator, RangeForLoop) {
299   int i = 0;
300   auto fails_at_3 = MakeFunctionIterator([&]() -> Result<TestInt> {
301     if (i >= 3) {
302       return Status::IndexError("fails at 3");
303     }
304     return i++;
305   });
306 
307   int expected_i = 0;
308   for (auto maybe_i : fails_at_3) {
309     if (expected_i < 3) {
310       ASSERT_OK(maybe_i.status());
311       ASSERT_EQ(*maybe_i, expected_i);
312     } else if (expected_i == 3) {
313       ASSERT_RAISES(IndexError, maybe_i.status());
314     }
315     ASSERT_LE(expected_i, 3) << "iteration stops after an error is encountered";
316     ++expected_i;
317   }
318 }
319 
TEST(FilterIterator,Basic)320 TEST(FilterIterator, Basic) {
321   AssertIteratorMatch({1, 2, 3, 4}, FilterIt(VectorIt({1, 2, 3, 4}), [](TestInt i) {
322                         return FilterIterator::Accept(std::move(i));
323                       }));
324 
325   AssertIteratorMatch({}, FilterIt(VectorIt({1, 2, 3, 4}), [](TestInt i) {
326                         return FilterIterator::Reject<TestInt>();
327                       }));
328 
329   AssertIteratorMatch({2, 4}, FilterIt(VectorIt({1, 2, 3, 4}), [](TestInt i) {
330                         return i.value % 2 == 0 ? FilterIterator::Accept(std::move(i))
331                                                 : FilterIterator::Reject<TestInt>();
332                       }));
333 }
334 
TEST(FlattenVectorIterator,Basic)335 TEST(FlattenVectorIterator, Basic) {
336   // Flatten expects to consume Iterator<Iterator<T>>
337   AssertIteratorMatch({}, FlattenIt(EmptyIt<Iterator<TestInt>>()));
338 
339   std::vector<Iterator<TestInt>> ok;
340   ok.push_back(VectorIt({1}));
341   ok.push_back(VectorIt({2}));
342   ok.push_back(VectorIt({3}));
343   AssertIteratorMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(ok))));
344 
345   std::vector<Iterator<TestInt>> not_enough;
346   not_enough.push_back(VectorIt({1}));
347   not_enough.push_back(VectorIt({2}));
348   AssertIteratorNoMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(not_enough))));
349 
350   std::vector<Iterator<TestInt>> too_much;
351   too_much.push_back(VectorIt({1}));
352   too_much.push_back(VectorIt({2}));
353   too_much.push_back(VectorIt({3}));
354   too_much.push_back(VectorIt({2}));
355   AssertIteratorNoMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(too_much))));
356 }
357 
Join(TestInt a,TestInt b)358 Iterator<TestInt> Join(TestInt a, TestInt b) {
359   std::vector<Iterator<TestInt>> joined{2};
360   joined[0] = VectorIt({a});
361   joined[1] = VectorIt({b});
362 
363   return FlattenIt(VectorIt(std::move(joined)));
364 }
365 
Join(TestInt a,Iterator<TestInt> b)366 Iterator<TestInt> Join(TestInt a, Iterator<TestInt> b) {
367   std::vector<Iterator<TestInt>> joined{2};
368   joined[0] = VectorIt(std::vector<TestInt>{a});
369   joined[1] = std::move(b);
370 
371   return FlattenIt(VectorIt(std::move(joined)));
372 }
373 
TEST(FlattenVectorIterator,Pyramid)374 TEST(FlattenVectorIterator, Pyramid) {
375   auto it = Join(1, Join(2, Join(2, Join(3, Join(3, 3)))));
376   AssertIteratorMatch({1, 2, 2, 3, 3, 3}, std::move(it));
377 }
378 
TEST(ReadaheadIterator,Empty)379 TEST(ReadaheadIterator, Empty) {
380   ASSERT_OK_AND_ASSIGN(auto it, MakeReadaheadIterator(VectorIt({}), 2));
381   AssertIteratorMatch({}, std::move(it));
382 }
383 
TEST(ReadaheadIterator,Basic)384 TEST(ReadaheadIterator, Basic) {
385   ASSERT_OK_AND_ASSIGN(auto it, MakeReadaheadIterator(VectorIt({1, 2, 3, 4, 5}), 2));
386   AssertIteratorMatch({1, 2, 3, 4, 5}, std::move(it));
387 }
388 
TEST(ReadaheadIterator,NotExhausted)389 TEST(ReadaheadIterator, NotExhausted) {
390   ASSERT_OK_AND_ASSIGN(auto it, MakeReadaheadIterator(VectorIt({1, 2, 3, 4, 5}), 2));
391   AssertIteratorNext({1}, it);
392   AssertIteratorNext({2}, it);
393 }
394 
TEST(ReadaheadIterator,Trace)395 TEST(ReadaheadIterator, Trace) {
396   TracingIterator<TestInt> tracing_it(VectorIt({1, 2, 3, 4, 5, 6, 7, 8}));
397   auto tracing = tracing_it.state();
398   ASSERT_EQ(tracing->values().size(), 0);
399 
400   ASSERT_OK_AND_ASSIGN(
401       auto it, MakeReadaheadIterator(Iterator<TestInt>(std::move(tracing_it)), 2));
402   SleepABit();  // Background iterator won't start pumping until first request comes in
403   ASSERT_EQ(tracing->values().size(), 0);
404 
405   AssertIteratorNext({1}, it);  // Once we ask for one value we should get that one value
406                                 // as well as 2 read ahead
407 
408   tracing->WaitForValues(3);
409   tracing->AssertValuesEqual({1, 2, 3});
410 
411   SleepABit();  // No further values should be fetched
412   tracing->AssertValuesEqual({1, 2, 3});
413 
414   AssertIteratorNext({2}, it);
415   AssertIteratorNext({3}, it);
416   AssertIteratorNext({4}, it);
417   tracing->WaitForValues(6);
418   SleepABit();
419   tracing->AssertValuesEqual({1, 2, 3, 4, 5, 6});
420 
421   AssertIteratorNext({5}, it);
422   AssertIteratorNext({6}, it);
423   AssertIteratorNext({7}, it);
424   tracing->WaitForValues(9);
425   SleepABit();
426   tracing->AssertValuesEqual({1, 2, 3, 4, 5, 6, 7, 8, {}});
427 
428   AssertIteratorNext({8}, it);
429   AssertIteratorExhausted(it);
430   AssertIteratorExhausted(it);  // Again
431   tracing->WaitForValues(9);
432   SleepABit();
433   tracing->AssertValuesStartwith({1, 2, 3, 4, 5, 6, 7, 8, {}});
434   // A couple more EOF values may have been emitted
435   const auto& values = tracing->values();
436   ASSERT_LE(values.size(), 11);
437   for (size_t i = 9; i < values.size(); ++i) {
438     ASSERT_EQ(values[i], TestInt());
439   }
440 
441   // Values were all emitted from the same thread, and it's not this thread
442   const auto& thread_ids = tracing->thread_ids();
443   ASSERT_EQ(thread_ids.size(), 1);
444   ASSERT_NE(*thread_ids.begin(), std::this_thread::get_id());
445 }
446 
TEST(ReadaheadIterator,NextError)447 TEST(ReadaheadIterator, NextError) {
448   TracingIterator<TestInt> tracing_it((VectorIt({1, 2, 3})));
449   auto tracing = tracing_it.state();
450   ASSERT_EQ(tracing->values().size(), 0);
451 
452   tracing->InsertFailure(Status::IOError("xxx"));
453 
454   ASSERT_OK_AND_ASSIGN(
455       auto it, MakeReadaheadIterator(Iterator<TestInt>(std::move(tracing_it)), 2));
456 
457   ASSERT_RAISES(IOError, it.Next().status());
458 
459   AssertIteratorExhausted(it);
460   SleepABit();
461   tracing->AssertValuesEqual({});
462   AssertIteratorExhausted(it);
463 }
464 
465 }  // namespace arrow
466