1 /*******************************************************************************
2  * tests/api/zip_node_test.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2016 Timo Bingmann <tb@panthema.net>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #include <gtest/gtest.h>
12 #include <thrill/api/all_gather.hpp>
13 #include <thrill/api/collapse.hpp>
14 #include <thrill/api/generate.hpp>
15 #include <thrill/api/size.hpp>
16 #include <thrill/api/zip.hpp>
17 #include <thrill/api/zip_window.hpp>
18 #include <thrill/api/zip_with_index.hpp>
19 #include <thrill/common/string.hpp>
20 
21 #include <algorithm>
22 #include <random>
23 #include <string>
24 #include <tuple>
25 #include <vector>
26 
27 using namespace thrill; // NOLINT
28 
29 struct MyStruct {
30     int a, b;
31 };
32 
33 static constexpr size_t test_size = 1000;
34 
TEST(ZipNode,TwoBalancedIntegerArrays)35 TEST(ZipNode, TwoBalancedIntegerArrays) {
36 
37     auto start_func =
38         [](Context& ctx) {
39 
40             // numbers 0..999 (evenly distributed to workers)
41             auto zip_input1 = Generate(
42                 ctx, test_size,
43                 [](size_t index) { return index; });
44 
45             // numbers 1000..1999
46             auto zip_input2 = zip_input1.Map(
47                 [](size_t i) { return static_cast<short>(test_size + i); });
48 
49             // zip
50             auto zip_result = zip_input1.Zip(
51                 zip_input2, [](size_t a, short b) -> long {
52                     return static_cast<long>(a) + b;
53                 });
54 
55             // check result
56             std::vector<long> res = zip_result.AllGather();
57 
58             ASSERT_EQ(test_size, res.size());
59 
60             for (size_t i = 0; i != res.size(); ++i) {
61                 ASSERT_EQ(static_cast<long>(i + i + test_size), res[i]);
62             }
63         };
64 
65     api::RunLocalTests(start_func);
66 }
67 
TEST(ZipNode,TwoBalancedIntegerArraysNoRebalance)68 TEST(ZipNode, TwoBalancedIntegerArraysNoRebalance) {
69 
70     auto start_func =
71         [](Context& ctx) {
72 
73             // numbers 0..999 (evenly distributed to workers)
74             auto zip_input1 = Generate(
75                 ctx, test_size,
76                 [](size_t index) { return index; });
77 
78             // numbers 1000..1999
79             auto zip_input2 = zip_input1.Map(
80                 [](size_t i) { return static_cast<short>(test_size + i); });
81 
82             // zip
83             auto zip_result = zip_input1.Zip(
84                 NoRebalanceTag,
85                 zip_input2, [](size_t a, short b) -> long {
86                     return static_cast<long>(a) + b;
87                 });
88 
89             // check result
90             std::vector<long> res = zip_result.AllGather();
91 
92             ASSERT_EQ(test_size, res.size());
93 
94             for (size_t i = 0; i != res.size(); ++i) {
95                 ASSERT_EQ(static_cast<long>(i + i + test_size), res[i]);
96             }
97         };
98 
99     api::RunLocalTests(start_func);
100 }
101 
TEST(ZipNode,TwoDisbalancedIntegerArrays)102 TEST(ZipNode, TwoDisbalancedIntegerArrays) {
103 
104     // first DIA is heavily balanced to the first workers, second DIA is
105     // balanced to the last workers.
106     auto start_func =
107         [](Context& ctx) {
108 
109             // numbers 0..999 (evenly distributed to workers)
110             auto input1 = Generate(
111                 ctx, test_size,
112                 [](size_t index) { return index; });
113 
114             // numbers 1000..1999
115             auto input2 = input1.Map(
116                 [](size_t i) { return static_cast<short>(test_size + i); });
117 
118             // numbers 0..99 (concentrated on first workers)
119             auto zip_input1 = input1.Filter(
120                 [](size_t i) { return i < test_size / 10; });
121 
122             // numbers 1900..1999 (concentrated on last workers)
123             auto zip_input2 = input2.Filter(
124                 [](size_t i) { return i >= 2 * test_size - test_size / 10; });
125 
126             // map to shorts
127             auto zip_input2_short = zip_input2.Map(
128                 [](size_t index) { return static_cast<short>(index); });
129 
130             // zip
131             auto zip_result = zip_input1.Zip(
132                 zip_input2_short, [](size_t a, short b) -> MyStruct {
133                     return { static_cast<int>(a), b };
134                 });
135 
136             // check result
137             std::vector<MyStruct> res = zip_result.Keep().AllGather();
138 
139             ASSERT_EQ(test_size / 10, res.size());
140 
141             for (size_t i = 0; i != res.size(); ++i) {
142                 // sLOG1 << i << res[i].a << res[i].b;
143                 ASSERT_EQ(static_cast<long>(i), res[i].a);
144                 ASSERT_EQ(static_cast<long>(2 * test_size - test_size / 10 + i), res[i].b);
145             }
146 
147             // check size of zip (recalculates ZipNode)
148             ASSERT_EQ(100u, zip_result.Size());
149         };
150 
151     api::RunLocalTests(start_func);
152 }
153 
TEST(ZipNode,TwoDisbalancedIntegerArraysZipWithIndex)154 TEST(ZipNode, TwoDisbalancedIntegerArraysZipWithIndex) {
155 
156     // first DIA is heavily balanced to the first workers, second DIA is
157     // balanced to the last workers.
158     auto start_func =
159         [](Context& ctx) {
160 
161             // numbers 0..999 (evenly distributed to workers)
162             auto input1 = Generate(
163                 ctx, test_size,
164                 [](size_t index) { return static_cast<unsigned>(index); });
165 
166             // numbers 0..99 (concentrated on first workers)
167             auto zip_input1 = input1.Filter(
168                 [](unsigned i) { return i >= 8 * test_size / 10; });
169 
170             // zip
171             auto zip_result = zip_input1.ZipWithIndex(
172                 [](unsigned a, size_t index) -> MyStruct {
173                     return { static_cast<int>(a), static_cast<int>(index) };
174                 });
175 
176             // check result
177             std::vector<MyStruct> res = zip_result.Keep().AllGather();
178 
179             ASSERT_EQ(2 * test_size / 10, res.size());
180 
181             for (size_t i = 0; i < res.size(); ++i) {
182                 // sLOG1 << i << res[i].a << res[i].b;
183                 ASSERT_EQ(static_cast<int>(8 * test_size / 10 + i), res[i].a);
184                 ASSERT_EQ(static_cast<int>(i), res[i].b);
185             }
186 
187             // check size of zip (recalculates ZipNode)
188             ASSERT_EQ(200u, zip_result.Size());
189         };
190 
191     api::RunLocalTests(start_func);
192 }
193 
TEST(ZipNode,TwoIntegerArraysWhereOneIsEmpty)194 TEST(ZipNode, TwoIntegerArraysWhereOneIsEmpty) {
195 
196     auto start_func =
197         [](Context& ctx) {
198 
199             // numbers 0..999 (evenly distributed to workers)
200             auto input1 = Generate(
201                 ctx, test_size,
202                 [](size_t index) { return index; });
203 
204             // numbers 0..999 (evenly distributed to workers)
205             auto input2 = Generate(
206                 ctx, 0,
207                 [](size_t index) { return index; });
208 
209             // map to shorts
210             auto input2_short = input2.Map(
211                 [](size_t index) { return static_cast<short>(index); });
212 
213             // zip
214             auto zip_result = input1.Zip(
215                 CutTag, input2_short, [](size_t a, short b) -> long {
216                     return static_cast<long>(a) + b;
217                 });
218 
219             // check result
220             std::vector<long> res = zip_result.AllGather();
221             ASSERT_EQ(0u, res.size());
222         };
223 
224     api::RunLocalTests(start_func);
225 }
226 
TEST(ZipNode,TwoDisbalancedStringArrays)227 TEST(ZipNode, TwoDisbalancedStringArrays) {
228 
229     // first DIA is heavily balanced to the first workers, second DIA is
230     // balanced to the last workers.
231     auto start_func =
232         [](Context& ctx) {
233 
234             // generate random strings with 10..20 characters
235             auto input_gen = Generate(
236                 ctx, test_size,
237                 [](size_t index) -> std::string {
238                     std::default_random_engine rng(
239                         123456 + static_cast<unsigned>(index));
240                     std::uniform_int_distribution<size_t> length(10, 20);
241                     rng(); // skip one number
242 
243                     return common::RandomString(
244                         length(rng), rng, "abcdefghijklmnopqrstuvwxyz")
245                     + std::to_string(index);
246                 });
247 
248             DIA<std::string> input = input_gen.Collapse();
249 
250             std::vector<std::string> vinput = input.AllGather();
251             ASSERT_EQ(test_size, vinput.size());
252 
253             // Filter out strings that start with a-e
254             auto input1 = input.Filter(
255                 [](const std::string& str) { return str[0] <= 'e'; });
256 
257             // Filter out strings that start with w-z
258             auto input2 = input.Filter(
259                 [](const std::string& str) { return str[0] >= 'w'; });
260 
261             // zip
262             auto zip_result = input1.Zip(
263                 CutTag,
264                 input2, [](const std::string& a, const std::string& b) {
265                     return a + b;
266                 });
267 
268             // check result
269             std::vector<std::string> res = zip_result.AllGather();
270 
271             // recalculate result locally
272             std::vector<std::string> check;
273             {
274                 std::vector<std::string> v1, v2;
275 
276                 for (size_t index = 0; index < vinput.size(); ++index) {
277                     const std::string& s1 = vinput[index];
278                     if (s1[0] <= 'e') v1.push_back(s1);
279                     if (s1[0] >= 'w') v2.push_back(s1);
280                 }
281 
282                 ASSERT_EQ(v1, input1.AllGather());
283                 ASSERT_EQ(v2, input2.AllGather());
284 
285                 for (size_t i = 0; i < std::min(v1.size(), v2.size()); ++i) {
286                     check.push_back(v1[i] + v2[i]);
287                     // sLOG1 << check.back();
288                 }
289             }
290 
291             for (size_t i = 0; i != res.size(); ++i) {
292                 sLOG0 << res[i] << " " << check[i] << (res[i] == check[i]);
293             }
294 
295             ASSERT_EQ(check.size(), res.size());
296             ASSERT_EQ(check, res);
297         };
298 
299     api::RunLocalTests(start_func);
300 }
301 
TEST(ZipNode,ThreeIntegerArrays)302 TEST(ZipNode, ThreeIntegerArrays) {
303 
304     auto start_func =
305         [](Context& ctx) {
306 
307             // numbers 0..999 (evenly distributed to workers)
308             auto input1 = Generate(
309                 ctx, test_size,
310                 [](size_t index) { return static_cast<short>(index); });
311 
312             // numbers 0..1999 (evenly distributed to workers)
313             auto input2 = Generate(
314                 ctx, test_size * 2,
315                 [](size_t index) { return index; });
316 
317             // numbers 0..0.999 (evenly distributed to workers)
318             auto input3 = Generate(
319                 ctx, test_size,
320                 [](size_t index) {
321                     return static_cast<double>(index)
322                     / static_cast<double>(test_size);
323                 });
324 
325             // zip
326             auto zip_result = Zip(
327                 CutTag,
328                 [](short a, size_t b, double c) {
329                     return std::make_tuple(a, b, c);
330                 },
331                 input1, input2, input3);
332 
333             // check result
334             std::vector<std::tuple<short, size_t, double> > res
335                 = zip_result.AllGather();
336 
337             ASSERT_EQ(test_size, res.size());
338             for (size_t i = 0; i < test_size; ++i) {
339                 ASSERT_EQ(static_cast<short>(i), std::get<0>(res[i]));
340                 ASSERT_EQ(static_cast<size_t>(i), std::get<1>(res[i]));
341                 ASSERT_DOUBLE_EQ(
342                     static_cast<double>(i) / static_cast<double>(test_size),
343                     std::get<2>(res[i]));
344             }
345         };
346 
347     api::RunLocalTests(start_func);
348 }
349 
TEST(ZipNode,ThreeIntegerArraysPadded)350 TEST(ZipNode, ThreeIntegerArraysPadded) {
351 
352     auto start_func =
353         [](Context& ctx) {
354 
355             // numbers 0..999 (evenly distributed to workers)
356             auto input1 = Generate(
357                 ctx, test_size,
358                 [](size_t index) { return static_cast<short>(index); });
359 
360             // numbers 0..1999 (evenly distributed to workers)
361             auto input2 = Generate(
362                 ctx, test_size * 2,
363                 [](size_t index) { return index; });
364 
365             // numbers 0..0.999 (evenly distributed to workers)
366             auto input3 = Generate(
367                 ctx, test_size,
368                 [](size_t index) {
369                     return static_cast<double>(index)
370                     / static_cast<double>(test_size);
371                 });
372 
373             // zip
374             auto zip_result = Zip(
375                 PadTag,
376                 [](short a, size_t b, double c) {
377                     return std::make_tuple(a, b, c);
378                 },
379                 std::make_tuple(42, 42, 42),
380                 input1, input2, input3);
381 
382             // check result
383             std::vector<std::tuple<short, size_t, double> > res
384                 = zip_result.AllGather();
385 
386             ASSERT_EQ(2 * test_size, res.size());
387             for (size_t i = 0; i < 2 * test_size; ++i) {
388                 ASSERT_EQ(i < test_size ? static_cast<short>(i) : 42,
389                           std::get<0>(res[i]));
390                 ASSERT_EQ(static_cast<size_t>(i),
391                           std::get<1>(res[i]));
392                 ASSERT_DOUBLE_EQ(
393                     i < test_size
394                     ? static_cast<double>(i) / static_cast<double>(test_size)
395                     : 42,
396                     std::get<2>(res[i]));
397             }
398         };
399 
400     api::RunLocalTests(start_func);
401 }
402 
403 /******************************************************************************/
404 
TEST(ZipWindowNode,TwoBalancedIntegerVectors)405 TEST(ZipWindowNode, TwoBalancedIntegerVectors) {
406 
407     auto start_func =
408         [](Context& ctx) {
409 
410             // numbers 0..999 (evenly distributed to workers)
411             auto zip_input1 = Generate(ctx, 2000);
412 
413             // numbers 0..2999
414             auto zip_input2 = Generate(ctx, 3000);
415 
416             // zip
417             auto zip_result = ZipWindow(
418                 {
419                     { 2, 3 }
420                 },
421                 [](const std::vector<size_t>& a, const std::vector<size_t>& b) -> long {
422                     die_unequal(2u, a.size());
423                     die_unequal(3u, b.size());
424                     return a[0] + 2 * a[1] + b[0] + 3 * b[1] + 7 * b[2];
425                 }, zip_input1, zip_input2);
426 
427             // check result
428             std::vector<long> res = zip_result.AllGather();
429 
430             ASSERT_EQ(1000u, res.size());
431 
432             for (size_t i = 0; i < res.size(); ++i) {
433                 ASSERT_EQ(39 * i + 19, res[i]);
434             }
435         };
436 
437     api::RunLocalTests(start_func);
438 }
439 
TEST(ZipWindowNode,TwoBalancedIntegerArrays)440 TEST(ZipWindowNode, TwoBalancedIntegerArrays) {
441 
442     auto start_func =
443         [](Context& ctx) {
444 
445             // numbers 0..999 (evenly distributed to workers)
446             auto zip_input1 = Generate(ctx, 2000);
447 
448             // numbers 0..2999
449             auto zip_input2 = Generate(ctx, 3000);
450 
451             // zip
452             auto zip_result = ZipWindow(
453                 ArrayTag, PadTag, {
454                     { 2, 3 }
455                 },
456                 [](const std::array<size_t, 2>& a, const std::array<size_t, 3>& b) -> long {
457                     die_unequal(2u, a.size());
458                     die_unequal(3u, b.size());
459                     return a[0] + 2 * a[1] + b[0] + 3 * b[1] + 7 * b[2];
460                 }, zip_input1, zip_input2);
461 
462             // check result
463             std::vector<long> res = zip_result.AllGather();
464 
465             ASSERT_EQ(1000u, res.size());
466 
467             for (size_t i = 0; i < res.size(); ++i) {
468                 ASSERT_EQ(39 * i + 19, res[i]);
469             }
470         };
471 
472     api::RunLocalTests(start_func);
473 }
474 
475 /******************************************************************************/
476