1 /*******************************************************************************
2 * tests/api/zip_node_test.cpp
3 *
4 * Part of Project Thrill - http://project-thrill.org
5 *
6 * Copyright (C) 2015-2016 Timo Bingmann <tb@panthema.net>
7 *
8 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9 ******************************************************************************/
10
11 #include <gtest/gtest.h>
12 #include <thrill/api/all_gather.hpp>
13 #include <thrill/api/collapse.hpp>
14 #include <thrill/api/generate.hpp>
15 #include <thrill/api/size.hpp>
16 #include <thrill/api/zip.hpp>
17 #include <thrill/api/zip_window.hpp>
18 #include <thrill/api/zip_with_index.hpp>
19 #include <thrill/common/string.hpp>
20
21 #include <algorithm>
22 #include <random>
23 #include <string>
24 #include <tuple>
25 #include <vector>
26
27 using namespace thrill; // NOLINT
28
29 struct MyStruct {
30 int a, b;
31 };
32
33 static constexpr size_t test_size = 1000;
34
TEST(ZipNode,TwoBalancedIntegerArrays)35 TEST(ZipNode, TwoBalancedIntegerArrays) {
36
37 auto start_func =
38 [](Context& ctx) {
39
40 // numbers 0..999 (evenly distributed to workers)
41 auto zip_input1 = Generate(
42 ctx, test_size,
43 [](size_t index) { return index; });
44
45 // numbers 1000..1999
46 auto zip_input2 = zip_input1.Map(
47 [](size_t i) { return static_cast<short>(test_size + i); });
48
49 // zip
50 auto zip_result = zip_input1.Zip(
51 zip_input2, [](size_t a, short b) -> long {
52 return static_cast<long>(a) + b;
53 });
54
55 // check result
56 std::vector<long> res = zip_result.AllGather();
57
58 ASSERT_EQ(test_size, res.size());
59
60 for (size_t i = 0; i != res.size(); ++i) {
61 ASSERT_EQ(static_cast<long>(i + i + test_size), res[i]);
62 }
63 };
64
65 api::RunLocalTests(start_func);
66 }
67
TEST(ZipNode,TwoBalancedIntegerArraysNoRebalance)68 TEST(ZipNode, TwoBalancedIntegerArraysNoRebalance) {
69
70 auto start_func =
71 [](Context& ctx) {
72
73 // numbers 0..999 (evenly distributed to workers)
74 auto zip_input1 = Generate(
75 ctx, test_size,
76 [](size_t index) { return index; });
77
78 // numbers 1000..1999
79 auto zip_input2 = zip_input1.Map(
80 [](size_t i) { return static_cast<short>(test_size + i); });
81
82 // zip
83 auto zip_result = zip_input1.Zip(
84 NoRebalanceTag,
85 zip_input2, [](size_t a, short b) -> long {
86 return static_cast<long>(a) + b;
87 });
88
89 // check result
90 std::vector<long> res = zip_result.AllGather();
91
92 ASSERT_EQ(test_size, res.size());
93
94 for (size_t i = 0; i != res.size(); ++i) {
95 ASSERT_EQ(static_cast<long>(i + i + test_size), res[i]);
96 }
97 };
98
99 api::RunLocalTests(start_func);
100 }
101
TEST(ZipNode,TwoDisbalancedIntegerArrays)102 TEST(ZipNode, TwoDisbalancedIntegerArrays) {
103
104 // first DIA is heavily balanced to the first workers, second DIA is
105 // balanced to the last workers.
106 auto start_func =
107 [](Context& ctx) {
108
109 // numbers 0..999 (evenly distributed to workers)
110 auto input1 = Generate(
111 ctx, test_size,
112 [](size_t index) { return index; });
113
114 // numbers 1000..1999
115 auto input2 = input1.Map(
116 [](size_t i) { return static_cast<short>(test_size + i); });
117
118 // numbers 0..99 (concentrated on first workers)
119 auto zip_input1 = input1.Filter(
120 [](size_t i) { return i < test_size / 10; });
121
122 // numbers 1900..1999 (concentrated on last workers)
123 auto zip_input2 = input2.Filter(
124 [](size_t i) { return i >= 2 * test_size - test_size / 10; });
125
126 // map to shorts
127 auto zip_input2_short = zip_input2.Map(
128 [](size_t index) { return static_cast<short>(index); });
129
130 // zip
131 auto zip_result = zip_input1.Zip(
132 zip_input2_short, [](size_t a, short b) -> MyStruct {
133 return { static_cast<int>(a), b };
134 });
135
136 // check result
137 std::vector<MyStruct> res = zip_result.Keep().AllGather();
138
139 ASSERT_EQ(test_size / 10, res.size());
140
141 for (size_t i = 0; i != res.size(); ++i) {
142 // sLOG1 << i << res[i].a << res[i].b;
143 ASSERT_EQ(static_cast<long>(i), res[i].a);
144 ASSERT_EQ(static_cast<long>(2 * test_size - test_size / 10 + i), res[i].b);
145 }
146
147 // check size of zip (recalculates ZipNode)
148 ASSERT_EQ(100u, zip_result.Size());
149 };
150
151 api::RunLocalTests(start_func);
152 }
153
TEST(ZipNode,TwoDisbalancedIntegerArraysZipWithIndex)154 TEST(ZipNode, TwoDisbalancedIntegerArraysZipWithIndex) {
155
156 // first DIA is heavily balanced to the first workers, second DIA is
157 // balanced to the last workers.
158 auto start_func =
159 [](Context& ctx) {
160
161 // numbers 0..999 (evenly distributed to workers)
162 auto input1 = Generate(
163 ctx, test_size,
164 [](size_t index) { return static_cast<unsigned>(index); });
165
166 // numbers 0..99 (concentrated on first workers)
167 auto zip_input1 = input1.Filter(
168 [](unsigned i) { return i >= 8 * test_size / 10; });
169
170 // zip
171 auto zip_result = zip_input1.ZipWithIndex(
172 [](unsigned a, size_t index) -> MyStruct {
173 return { static_cast<int>(a), static_cast<int>(index) };
174 });
175
176 // check result
177 std::vector<MyStruct> res = zip_result.Keep().AllGather();
178
179 ASSERT_EQ(2 * test_size / 10, res.size());
180
181 for (size_t i = 0; i < res.size(); ++i) {
182 // sLOG1 << i << res[i].a << res[i].b;
183 ASSERT_EQ(static_cast<int>(8 * test_size / 10 + i), res[i].a);
184 ASSERT_EQ(static_cast<int>(i), res[i].b);
185 }
186
187 // check size of zip (recalculates ZipNode)
188 ASSERT_EQ(200u, zip_result.Size());
189 };
190
191 api::RunLocalTests(start_func);
192 }
193
TEST(ZipNode,TwoIntegerArraysWhereOneIsEmpty)194 TEST(ZipNode, TwoIntegerArraysWhereOneIsEmpty) {
195
196 auto start_func =
197 [](Context& ctx) {
198
199 // numbers 0..999 (evenly distributed to workers)
200 auto input1 = Generate(
201 ctx, test_size,
202 [](size_t index) { return index; });
203
204 // numbers 0..999 (evenly distributed to workers)
205 auto input2 = Generate(
206 ctx, 0,
207 [](size_t index) { return index; });
208
209 // map to shorts
210 auto input2_short = input2.Map(
211 [](size_t index) { return static_cast<short>(index); });
212
213 // zip
214 auto zip_result = input1.Zip(
215 CutTag, input2_short, [](size_t a, short b) -> long {
216 return static_cast<long>(a) + b;
217 });
218
219 // check result
220 std::vector<long> res = zip_result.AllGather();
221 ASSERT_EQ(0u, res.size());
222 };
223
224 api::RunLocalTests(start_func);
225 }
226
TEST(ZipNode,TwoDisbalancedStringArrays)227 TEST(ZipNode, TwoDisbalancedStringArrays) {
228
229 // first DIA is heavily balanced to the first workers, second DIA is
230 // balanced to the last workers.
231 auto start_func =
232 [](Context& ctx) {
233
234 // generate random strings with 10..20 characters
235 auto input_gen = Generate(
236 ctx, test_size,
237 [](size_t index) -> std::string {
238 std::default_random_engine rng(
239 123456 + static_cast<unsigned>(index));
240 std::uniform_int_distribution<size_t> length(10, 20);
241 rng(); // skip one number
242
243 return common::RandomString(
244 length(rng), rng, "abcdefghijklmnopqrstuvwxyz")
245 + std::to_string(index);
246 });
247
248 DIA<std::string> input = input_gen.Collapse();
249
250 std::vector<std::string> vinput = input.AllGather();
251 ASSERT_EQ(test_size, vinput.size());
252
253 // Filter out strings that start with a-e
254 auto input1 = input.Filter(
255 [](const std::string& str) { return str[0] <= 'e'; });
256
257 // Filter out strings that start with w-z
258 auto input2 = input.Filter(
259 [](const std::string& str) { return str[0] >= 'w'; });
260
261 // zip
262 auto zip_result = input1.Zip(
263 CutTag,
264 input2, [](const std::string& a, const std::string& b) {
265 return a + b;
266 });
267
268 // check result
269 std::vector<std::string> res = zip_result.AllGather();
270
271 // recalculate result locally
272 std::vector<std::string> check;
273 {
274 std::vector<std::string> v1, v2;
275
276 for (size_t index = 0; index < vinput.size(); ++index) {
277 const std::string& s1 = vinput[index];
278 if (s1[0] <= 'e') v1.push_back(s1);
279 if (s1[0] >= 'w') v2.push_back(s1);
280 }
281
282 ASSERT_EQ(v1, input1.AllGather());
283 ASSERT_EQ(v2, input2.AllGather());
284
285 for (size_t i = 0; i < std::min(v1.size(), v2.size()); ++i) {
286 check.push_back(v1[i] + v2[i]);
287 // sLOG1 << check.back();
288 }
289 }
290
291 for (size_t i = 0; i != res.size(); ++i) {
292 sLOG0 << res[i] << " " << check[i] << (res[i] == check[i]);
293 }
294
295 ASSERT_EQ(check.size(), res.size());
296 ASSERT_EQ(check, res);
297 };
298
299 api::RunLocalTests(start_func);
300 }
301
TEST(ZipNode,ThreeIntegerArrays)302 TEST(ZipNode, ThreeIntegerArrays) {
303
304 auto start_func =
305 [](Context& ctx) {
306
307 // numbers 0..999 (evenly distributed to workers)
308 auto input1 = Generate(
309 ctx, test_size,
310 [](size_t index) { return static_cast<short>(index); });
311
312 // numbers 0..1999 (evenly distributed to workers)
313 auto input2 = Generate(
314 ctx, test_size * 2,
315 [](size_t index) { return index; });
316
317 // numbers 0..0.999 (evenly distributed to workers)
318 auto input3 = Generate(
319 ctx, test_size,
320 [](size_t index) {
321 return static_cast<double>(index)
322 / static_cast<double>(test_size);
323 });
324
325 // zip
326 auto zip_result = Zip(
327 CutTag,
328 [](short a, size_t b, double c) {
329 return std::make_tuple(a, b, c);
330 },
331 input1, input2, input3);
332
333 // check result
334 std::vector<std::tuple<short, size_t, double> > res
335 = zip_result.AllGather();
336
337 ASSERT_EQ(test_size, res.size());
338 for (size_t i = 0; i < test_size; ++i) {
339 ASSERT_EQ(static_cast<short>(i), std::get<0>(res[i]));
340 ASSERT_EQ(static_cast<size_t>(i), std::get<1>(res[i]));
341 ASSERT_DOUBLE_EQ(
342 static_cast<double>(i) / static_cast<double>(test_size),
343 std::get<2>(res[i]));
344 }
345 };
346
347 api::RunLocalTests(start_func);
348 }
349
TEST(ZipNode,ThreeIntegerArraysPadded)350 TEST(ZipNode, ThreeIntegerArraysPadded) {
351
352 auto start_func =
353 [](Context& ctx) {
354
355 // numbers 0..999 (evenly distributed to workers)
356 auto input1 = Generate(
357 ctx, test_size,
358 [](size_t index) { return static_cast<short>(index); });
359
360 // numbers 0..1999 (evenly distributed to workers)
361 auto input2 = Generate(
362 ctx, test_size * 2,
363 [](size_t index) { return index; });
364
365 // numbers 0..0.999 (evenly distributed to workers)
366 auto input3 = Generate(
367 ctx, test_size,
368 [](size_t index) {
369 return static_cast<double>(index)
370 / static_cast<double>(test_size);
371 });
372
373 // zip
374 auto zip_result = Zip(
375 PadTag,
376 [](short a, size_t b, double c) {
377 return std::make_tuple(a, b, c);
378 },
379 std::make_tuple(42, 42, 42),
380 input1, input2, input3);
381
382 // check result
383 std::vector<std::tuple<short, size_t, double> > res
384 = zip_result.AllGather();
385
386 ASSERT_EQ(2 * test_size, res.size());
387 for (size_t i = 0; i < 2 * test_size; ++i) {
388 ASSERT_EQ(i < test_size ? static_cast<short>(i) : 42,
389 std::get<0>(res[i]));
390 ASSERT_EQ(static_cast<size_t>(i),
391 std::get<1>(res[i]));
392 ASSERT_DOUBLE_EQ(
393 i < test_size
394 ? static_cast<double>(i) / static_cast<double>(test_size)
395 : 42,
396 std::get<2>(res[i]));
397 }
398 };
399
400 api::RunLocalTests(start_func);
401 }
402
403 /******************************************************************************/
404
TEST(ZipWindowNode,TwoBalancedIntegerVectors)405 TEST(ZipWindowNode, TwoBalancedIntegerVectors) {
406
407 auto start_func =
408 [](Context& ctx) {
409
410 // numbers 0..999 (evenly distributed to workers)
411 auto zip_input1 = Generate(ctx, 2000);
412
413 // numbers 0..2999
414 auto zip_input2 = Generate(ctx, 3000);
415
416 // zip
417 auto zip_result = ZipWindow(
418 {
419 { 2, 3 }
420 },
421 [](const std::vector<size_t>& a, const std::vector<size_t>& b) -> long {
422 die_unequal(2u, a.size());
423 die_unequal(3u, b.size());
424 return a[0] + 2 * a[1] + b[0] + 3 * b[1] + 7 * b[2];
425 }, zip_input1, zip_input2);
426
427 // check result
428 std::vector<long> res = zip_result.AllGather();
429
430 ASSERT_EQ(1000u, res.size());
431
432 for (size_t i = 0; i < res.size(); ++i) {
433 ASSERT_EQ(39 * i + 19, res[i]);
434 }
435 };
436
437 api::RunLocalTests(start_func);
438 }
439
TEST(ZipWindowNode,TwoBalancedIntegerArrays)440 TEST(ZipWindowNode, TwoBalancedIntegerArrays) {
441
442 auto start_func =
443 [](Context& ctx) {
444
445 // numbers 0..999 (evenly distributed to workers)
446 auto zip_input1 = Generate(ctx, 2000);
447
448 // numbers 0..2999
449 auto zip_input2 = Generate(ctx, 3000);
450
451 // zip
452 auto zip_result = ZipWindow(
453 ArrayTag, PadTag, {
454 { 2, 3 }
455 },
456 [](const std::array<size_t, 2>& a, const std::array<size_t, 3>& b) -> long {
457 die_unequal(2u, a.size());
458 die_unequal(3u, b.size());
459 return a[0] + 2 * a[1] + b[0] + 3 * b[1] + 7 * b[2];
460 }, zip_input1, zip_input2);
461
462 // check result
463 std::vector<long> res = zip_result.AllGather();
464
465 ASSERT_EQ(1000u, res.size());
466
467 for (size_t i = 0; i < res.size(); ++i) {
468 ASSERT_EQ(39 * i + 19, res[i]);
469 }
470 };
471
472 api::RunLocalTests(start_func);
473 }
474
475 /******************************************************************************/
476