1#
2# Licensed to the Apache Software Foundation (ASF) under one or more
3# contributor license agreements.  See the NOTICE file distributed with
4# this work for additional information regarding copyright ownership.
5# The ASF licenses this file to You under the Apache License, Version 2.0
6# (the "License"); you may not use this file except in compliance with
7# the License.  You may obtain a copy of the License at
8#
9#    http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18context("basic RDD functions")
19
20# JavaSparkContext handle
21sparkSession <- sparkR.session(enableHiveSupport = FALSE)
22sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
23
24# Data
25nums <- 1:10
26rdd <- parallelize(sc, nums, 2L)
27
28intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
29intRdd <- parallelize(sc, intPairs, 2L)
30
31test_that("get number of partitions in RDD", {
32  expect_equal(getNumPartitionsRDD(rdd), 2)
33  expect_equal(getNumPartitionsRDD(intRdd), 2)
34})
35
36test_that("first on RDD", {
37  expect_equal(firstRDD(rdd), 1)
38  newrdd <- lapply(rdd, function(x) x + 1)
39  expect_equal(firstRDD(newrdd), 2)
40})
41
42test_that("count and length on RDD", {
43   expect_equal(countRDD(rdd), 10)
44   expect_equal(lengthRDD(rdd), 10)
45})
46
47test_that("count by values and keys", {
48  mods <- lapply(rdd, function(x) { x %% 3 })
49  actual <- countByValue(mods)
50  expected <- list(list(0, 3L), list(1, 4L), list(2, 3L))
51  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
52
53  actual <- countByKey(intRdd)
54  expected <- list(list(2L, 2L), list(1L, 2L))
55  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
56})
57
58test_that("lapply on RDD", {
59  multiples <- lapply(rdd, function(x) { 2 * x })
60  actual <- collectRDD(multiples)
61  expect_equal(actual, as.list(nums * 2))
62})
63
64test_that("lapplyPartition on RDD", {
65  sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) })
66  actual <- collectRDD(sums)
67  expect_equal(actual, list(15, 40))
68})
69
70test_that("mapPartitions on RDD", {
71  sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) })
72  actual <- collectRDD(sums)
73  expect_equal(actual, list(15, 40))
74})
75
76test_that("flatMap() on RDDs", {
77  flat <- flatMap(intRdd, function(x) { list(x, x) })
78  actual <- collectRDD(flat)
79  expect_equal(actual, rep(intPairs, each = 2))
80})
81
82test_that("filterRDD on RDD", {
83  filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
84  actual <- collectRDD(filtered.rdd)
85  expect_equal(actual, list(2, 4, 6, 8, 10))
86
87  filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
88  actual <- collectRDD(filtered.rdd)
89  expect_equal(actual, list(list(1L, -1)))
90
91  # Filter out all elements.
92  filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
93  actual <- collectRDD(filtered.rdd)
94  expect_equal(actual, list())
95})
96
97test_that("lookup on RDD", {
98  vals <- lookup(intRdd, 1L)
99  expect_equal(vals, list(-1, 200))
100
101  vals <- lookup(intRdd, 3L)
102  expect_equal(vals, list())
103})
104
105test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
106  rdd2 <- rdd
107  for (i in 1:12)
108    rdd2 <- lapplyPartitionsWithIndex(
109              rdd2, function(partIndex, part) {
110                part <- as.list(unlist(part) * partIndex + i)
111              })
112  rdd2 <- lapply(rdd2, function(x) x + x)
113  actual <- collectRDD(rdd2)
114  expected <- list(24, 24, 24, 24, 24,
115                   168, 170, 172, 174, 176)
116  expect_equal(actual, expected)
117})
118
119test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", {
120  # RDD
121  rdd2 <- rdd
122  # PipelinedRDD
123  rdd2 <- lapplyPartitionsWithIndex(
124            rdd2,
125            function(partIndex, part) {
126              part <- as.list(unlist(part) * partIndex)
127            })
128
129  cacheRDD(rdd2)
130  expect_true(rdd2@env$isCached)
131  rdd2 <- lapply(rdd2, function(x) x)
132  expect_false(rdd2@env$isCached)
133
134  unpersistRDD(rdd2)
135  expect_false(rdd2@env$isCached)
136
137  persistRDD(rdd2, "MEMORY_AND_DISK")
138  expect_true(rdd2@env$isCached)
139  rdd2 <- lapply(rdd2, function(x) x)
140  expect_false(rdd2@env$isCached)
141
142  unpersistRDD(rdd2)
143  expect_false(rdd2@env$isCached)
144
145  tempDir <- tempfile(pattern = "checkpoint")
146  setCheckpointDir(sc, tempDir)
147  checkpoint(rdd2)
148  expect_true(rdd2@env$isCheckpointed)
149
150  rdd2 <- lapply(rdd2, function(x) x)
151  expect_false(rdd2@env$isCached)
152  expect_false(rdd2@env$isCheckpointed)
153
154  # make sure the data is collectable
155  collectRDD(rdd2)
156
157  unlink(tempDir)
158})
159
160test_that("reduce on RDD", {
161  sum <- reduce(rdd, "+")
162  expect_equal(sum, 55)
163
164  # Also test with an inline function
165  sumInline <- reduce(rdd, function(x, y) { x + y })
166  expect_equal(sumInline, 55)
167})
168
169test_that("lapply with dependency", {
170  fa <- 5
171  multiples <- lapply(rdd, function(x) { fa * x })
172  actual <- collectRDD(multiples)
173
174  expect_equal(actual, as.list(nums * 5))
175})
176
177test_that("lapplyPartitionsWithIndex on RDDs", {
178  func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
179  actual <- collectRDD(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
180  expect_equal(actual, list(list(0, 15), list(1, 40)))
181
182  pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
183  partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
184  mkTup <- function(partIndex, part) { list(partIndex, part) }
185  actual <- collectRDD(lapplyPartitionsWithIndex(
186                      partitionByRDD(pairsRDD, 2L, partitionByParity),
187                      mkTup),
188                    FALSE)
189  expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))),
190                            list(1, list(list(4, 8)))))
191})
192
193test_that("sampleRDD() on RDDs", {
194  expect_equal(unlist(collectRDD(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
195})
196
197test_that("takeSample() on RDDs", {
198  # ported from RDDSuite.scala, modified seeds
199  data <- parallelize(sc, 1:100, 2L)
200  for (seed in 4:5) {
201    s <- takeSample(data, FALSE, 20L, seed)
202    expect_equal(length(s), 20L)
203    expect_equal(length(unique(s)), 20L)
204    for (elem in s) {
205      expect_true(elem >= 1 && elem <= 100)
206    }
207  }
208  for (seed in 4:5) {
209    s <- takeSample(data, FALSE, 200L, seed)
210    expect_equal(length(s), 100L)
211    expect_equal(length(unique(s)), 100L)
212    for (elem in s) {
213      expect_true(elem >= 1 && elem <= 100)
214    }
215  }
216  for (seed in 4:5) {
217    s <- takeSample(data, TRUE, 20L, seed)
218    expect_equal(length(s), 20L)
219    for (elem in s) {
220      expect_true(elem >= 1 && elem <= 100)
221    }
222  }
223  for (seed in 4:5) {
224    s <- takeSample(data, TRUE, 100L, seed)
225    expect_equal(length(s), 100L)
226    # Chance of getting all distinct elements is astronomically low, so test we
227    # got less than 100
228    expect_true(length(unique(s)) < 100L)
229  }
230  for (seed in 4:5) {
231    s <- takeSample(data, TRUE, 200L, seed)
232    expect_equal(length(s), 200L)
233    # Chance of getting all distinct elements is still quite low, so test we
234    # got less than 100
235    expect_true(length(unique(s)) < 100L)
236  }
237})
238
239test_that("mapValues() on pairwise RDDs", {
240  multiples <- mapValues(intRdd, function(x) { x * 2 })
241  actual <- collectRDD(multiples)
242  expected <- lapply(intPairs, function(x) {
243    list(x[[1]], x[[2]] * 2)
244  })
245  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
246})
247
248test_that("flatMapValues() on pairwise RDDs", {
249  l <- parallelize(sc, list(list(1, c(1, 2)), list(2, c(3, 4))))
250  actual <- collectRDD(flatMapValues(l, function(x) { x }))
251  expect_equal(actual, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
252
253  # Generate x to x+1 for every value
254  actual <- collectRDD(flatMapValues(intRdd, function(x) { x: (x + 1) }))
255  expect_equal(actual,
256               list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
257                    list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
258})
259
260test_that("reduceByKeyLocally() on PairwiseRDDs", {
261  pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L)
262  actual <- reduceByKeyLocally(pairs, "+")
263  expect_equal(sortKeyValueList(actual),
264               sortKeyValueList(list(list(1, 6), list(1.1, 3))))
265
266  pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3),
267                                list("bb", 5)), 4L)
268  actual <- reduceByKeyLocally(pairs, "+")
269  expect_equal(sortKeyValueList(actual),
270               sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5))))
271})
272
273test_that("distinct() on RDDs", {
274  nums.rep2 <- rep(1:10, 2)
275  rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
276  uniques <- distinctRDD(rdd.rep2)
277  actual <- sort(unlist(collectRDD(uniques)))
278  expect_equal(actual, nums)
279})
280
281test_that("maximum() on RDDs", {
282  max <- maximum(rdd)
283  expect_equal(max, 10)
284})
285
286test_that("minimum() on RDDs", {
287  min <- minimum(rdd)
288  expect_equal(min, 1)
289})
290
291test_that("sumRDD() on RDDs", {
292  sum <- sumRDD(rdd)
293  expect_equal(sum, 55)
294})
295
296test_that("keyBy on RDDs", {
297  func <- function(x) { x * x }
298  keys <- keyBy(rdd, func)
299  actual <- collectRDD(keys)
300  expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
301})
302
303test_that("repartition/coalesce on RDDs", {
304  rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements
305
306  # repartition
307  r1 <- repartitionRDD(rdd, 2)
308  expect_equal(getNumPartitionsRDD(r1), 2L)
309  count <- length(collectPartition(r1, 0L))
310  expect_true(count >= 8 && count <= 12)
311
312  r2 <- repartitionRDD(rdd, 6)
313  expect_equal(getNumPartitionsRDD(r2), 6L)
314  count <- length(collectPartition(r2, 0L))
315  expect_true(count >= 0 && count <= 4)
316
317  # coalesce
318  r3 <- coalesceRDD(rdd, 1)
319  expect_equal(getNumPartitionsRDD(r3), 1L)
320  count <- length(collectPartition(r3, 0L))
321  expect_equal(count, 20)
322})
323
324test_that("sortBy() on RDDs", {
325  sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
326  actual <- collectRDD(sortedRdd)
327  expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
328
329  rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
330  sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
331  actual <- collectRDD(sortedRdd2)
332  expect_equal(actual, as.list(nums))
333})
334
335test_that("takeOrdered() on RDDs", {
336  l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
337  rdd <- parallelize(sc, l)
338  actual <- takeOrdered(rdd, 6L)
339  expect_equal(actual, as.list(sort(unlist(l)))[1:6])
340
341  l <- list("e", "d", "c", "d", "a")
342  rdd <- parallelize(sc, l)
343  actual <- takeOrdered(rdd, 3L)
344  expect_equal(actual, as.list(sort(unlist(l)))[1:3])
345})
346
347test_that("top() on RDDs", {
348  l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
349  rdd <- parallelize(sc, l)
350  actual <- top(rdd, 6L)
351  expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])
352
353  l <- list("e", "d", "c", "d", "a")
354  rdd <- parallelize(sc, l)
355  actual <- top(rdd, 3L)
356  expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
357})
358
359test_that("fold() on RDDs", {
360  actual <- fold(rdd, 0, "+")
361  expect_equal(actual, Reduce("+", nums, 0))
362
363  rdd <- parallelize(sc, list())
364  actual <- fold(rdd, 0, "+")
365  expect_equal(actual, 0)
366})
367
368test_that("aggregateRDD() on RDDs", {
369  rdd <- parallelize(sc, list(1, 2, 3, 4))
370  zeroValue <- list(0, 0)
371  seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
372  combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
373  actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
374  expect_equal(actual, list(10, 4))
375
376  rdd <- parallelize(sc, list())
377  actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
378  expect_equal(actual, list(0, 0))
379})
380
381test_that("zipWithUniqueId() on RDDs", {
382  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
383  actual <- collectRDD(zipWithUniqueId(rdd))
384  expected <- list(list("a", 0), list("b", 1), list("c", 4),
385                   list("d", 2), list("e", 5))
386  expect_equal(actual, expected)
387
388  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
389  actual <- collectRDD(zipWithUniqueId(rdd))
390  expected <- list(list("a", 0), list("b", 1), list("c", 2),
391                   list("d", 3), list("e", 4))
392  expect_equal(actual, expected)
393})
394
395test_that("zipWithIndex() on RDDs", {
396  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
397  actual <- collectRDD(zipWithIndex(rdd))
398  expected <- list(list("a", 0), list("b", 1), list("c", 2),
399                   list("d", 3), list("e", 4))
400  expect_equal(actual, expected)
401
402  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
403  actual <- collectRDD(zipWithIndex(rdd))
404  expected <- list(list("a", 0), list("b", 1), list("c", 2),
405                   list("d", 3), list("e", 4))
406  expect_equal(actual, expected)
407})
408
409test_that("glom() on RDD", {
410  rdd <- parallelize(sc, as.list(1:4), 2L)
411  actual <- collectRDD(glom(rdd))
412  expect_equal(actual, list(list(1, 2), list(3, 4)))
413})
414
415test_that("keys() on RDDs", {
416  keys <- keys(intRdd)
417  actual <- collectRDD(keys)
418  expect_equal(actual, lapply(intPairs, function(x) { x[[1]] }))
419})
420
421test_that("values() on RDDs", {
422  values <- values(intRdd)
423  actual <- collectRDD(values)
424  expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
425})
426
427test_that("pipeRDD() on RDDs", {
428  actual <- collectRDD(pipeRDD(rdd, "more"))
429  expected <- as.list(as.character(1:10))
430  expect_equal(actual, expected)
431
432  trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
433  actual <- collectRDD(pipeRDD(trailed.rdd, "sort"))
434  expected <- list("", "1", "2", "3")
435  expect_equal(actual, expected)
436
437  rev.nums <- 9:0
438  rev.rdd <- parallelize(sc, rev.nums, 2L)
439  actual <- collectRDD(pipeRDD(rev.rdd, "sort"))
440  expected <- as.list(as.character(c(5:9, 0:4)))
441  expect_equal(actual, expected)
442})
443
444test_that("zipRDD() on RDDs", {
445  rdd1 <- parallelize(sc, 0:4, 2)
446  rdd2 <- parallelize(sc, 1000:1004, 2)
447  actual <- collectRDD(zipRDD(rdd1, rdd2))
448  expect_equal(actual,
449               list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
450
451  mockFile <- c("Spark is pretty.", "Spark is awesome.")
452  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
453  writeLines(mockFile, fileName)
454
455  rdd <- textFile(sc, fileName, 1)
456  actual <- collectRDD(zipRDD(rdd, rdd))
457  expected <- lapply(mockFile, function(x) { list(x, x) })
458  expect_equal(actual, expected)
459
460  rdd1 <- parallelize(sc, 0:1, 1)
461  actual <- collectRDD(zipRDD(rdd1, rdd))
462  expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) })
463  expect_equal(actual, expected)
464
465  rdd1 <- map(rdd, function(x) { x })
466  actual <- collectRDD(zipRDD(rdd, rdd1))
467  expected <- lapply(mockFile, function(x) { list(x, x) })
468  expect_equal(actual, expected)
469
470  unlink(fileName)
471})
472
473test_that("cartesian() on RDDs", {
474  rdd <- parallelize(sc, 1:3)
475  actual <- collectRDD(cartesian(rdd, rdd))
476  expect_equal(sortKeyValueList(actual),
477               list(
478                 list(1, 1), list(1, 2), list(1, 3),
479                 list(2, 1), list(2, 2), list(2, 3),
480                 list(3, 1), list(3, 2), list(3, 3)))
481
482  # test case where one RDD is empty
483  emptyRdd <- parallelize(sc, list())
484  actual <- collectRDD(cartesian(rdd, emptyRdd))
485  expect_equal(actual, list())
486
487  mockFile <- c("Spark is pretty.", "Spark is awesome.")
488  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
489  writeLines(mockFile, fileName)
490
491  rdd <- textFile(sc, fileName)
492  actual <- collectRDD(cartesian(rdd, rdd))
493  expected <- list(
494    list("Spark is awesome.", "Spark is pretty."),
495    list("Spark is awesome.", "Spark is awesome."),
496    list("Spark is pretty.", "Spark is pretty."),
497    list("Spark is pretty.", "Spark is awesome."))
498  expect_equal(sortKeyValueList(actual), expected)
499
500  rdd1 <- parallelize(sc, 0:1)
501  actual <- collectRDD(cartesian(rdd1, rdd))
502  expect_equal(sortKeyValueList(actual),
503               list(
504                 list(0, "Spark is pretty."),
505                 list(0, "Spark is awesome."),
506                 list(1, "Spark is pretty."),
507                 list(1, "Spark is awesome.")))
508
509  rdd1 <- map(rdd, function(x) { x })
510  actual <- collectRDD(cartesian(rdd, rdd1))
511  expect_equal(sortKeyValueList(actual), expected)
512
513  unlink(fileName)
514})
515
516test_that("subtract() on RDDs", {
517  l <- list(1, 1, 2, 2, 3, 4)
518  rdd1 <- parallelize(sc, l)
519
520  # subtract by itself
521  actual <- collectRDD(subtract(rdd1, rdd1))
522  expect_equal(actual, list())
523
524  # subtract by an empty RDD
525  rdd2 <- parallelize(sc, list())
526  actual <- collectRDD(subtract(rdd1, rdd2))
527  expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
528               l)
529
530  rdd2 <- parallelize(sc, list(2, 4))
531  actual <- collectRDD(subtract(rdd1, rdd2))
532  expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
533               list(1, 1, 3))
534
535  l <- list("a", "a", "b", "b", "c", "d")
536  rdd1 <- parallelize(sc, l)
537  rdd2 <- parallelize(sc, list("b", "d"))
538  actual <- collectRDD(subtract(rdd1, rdd2))
539  expect_equal(as.list(sort(as.vector(actual, mode = "character"))),
540               list("a", "a", "c"))
541})
542
543test_that("subtractByKey() on pairwise RDDs", {
544  l <- list(list("a", 1), list("b", 4),
545            list("b", 5), list("a", 2))
546  rdd1 <- parallelize(sc, l)
547
548  # subtractByKey by itself
549  actual <- collectRDD(subtractByKey(rdd1, rdd1))
550  expect_equal(actual, list())
551
552  # subtractByKey by an empty RDD
553  rdd2 <- parallelize(sc, list())
554  actual <- collectRDD(subtractByKey(rdd1, rdd2))
555  expect_equal(sortKeyValueList(actual),
556               sortKeyValueList(l))
557
558  rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
559  actual <- collectRDD(subtractByKey(rdd1, rdd2))
560  expect_equal(actual,
561               list(list("b", 4), list("b", 5)))
562
563  l <- list(list(1, 1), list(2, 4),
564            list(2, 5), list(1, 2))
565  rdd1 <- parallelize(sc, l)
566  rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1)))
567  actual <- collectRDD(subtractByKey(rdd1, rdd2))
568  expect_equal(actual,
569               list(list(2, 4), list(2, 5)))
570})
571
572test_that("intersection() on RDDs", {
573  # intersection with self
574  actual <- collectRDD(intersection(rdd, rdd))
575  expect_equal(sort(as.integer(actual)), nums)
576
577  # intersection with an empty RDD
578  emptyRdd <- parallelize(sc, list())
579  actual <- collectRDD(intersection(rdd, emptyRdd))
580  expect_equal(actual, list())
581
582  rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
583  rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
584  actual <- collectRDD(intersection(rdd1, rdd2))
585  expect_equal(sort(as.integer(actual)), 1:3)
586})
587
588test_that("join() on pairwise RDDs", {
589  rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
590  rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
591  actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
592  expect_equal(sortKeyValueList(actual),
593               sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3)))))
594
595  rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
596  rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
597  actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
598  expect_equal(sortKeyValueList(actual),
599               sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3)))))
600
601  rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
602  rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
603  actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
604  expect_equal(actual, list())
605
606  rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
607  rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
608  actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
609  expect_equal(actual, list())
610})
611
612test_that("leftOuterJoin() on pairwise RDDs", {
613  rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
614  rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
615  actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
616  expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
617  expect_equal(sortKeyValueList(actual),
618               sortKeyValueList(expected))
619
620  rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
621  rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
622  actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
623  expected <-  list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3)))
624  expect_equal(sortKeyValueList(actual),
625               sortKeyValueList(expected))
626
627  rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
628  rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
629  actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
630  expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL)))
631  expect_equal(sortKeyValueList(actual),
632               sortKeyValueList(expected))
633
634  rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
635  rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
636  actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
637  expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL)))
638  expect_equal(sortKeyValueList(actual),
639               sortKeyValueList(expected))
640})
641
642test_that("rightOuterJoin() on pairwise RDDs", {
643  rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
644  rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
645  actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
646  expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
647  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
648
649  rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3)))
650  rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
651  actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
652  expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)))
653  expect_equal(sortKeyValueList(actual),
654               sortKeyValueList(expected))
655
656  rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
657  rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
658  actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
659  expect_equal(sortKeyValueList(actual),
660               sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
661
662  rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
663  rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
664  actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
665  expect_equal(sortKeyValueList(actual),
666               sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
667})
668
669test_that("fullOuterJoin() on pairwise RDDs", {
670  rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
671  rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
672  actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
673  expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
674                   list(2, list(NULL, 4)), list(3, list(3, NULL)))
675  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
676
677  rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3), list("c", 1)))
678  rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
679  actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
680  expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
681                   list("a", list(3, 1)), list("c", list(1, NULL)))
682  expect_equal(sortKeyValueList(actual),
683               sortKeyValueList(expected))
684
685  rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
686  rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
687  actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
688  expect_equal(sortKeyValueList(actual),
689               sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
690                                     list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
691
692  rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
693  rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
694  actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
695  expect_equal(sortKeyValueList(actual),
696               sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
697                                     list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
698})
699
700test_that("sortByKey() on pairwise RDDs", {
701  numPairsRdd <- map(rdd, function(x) { list (x, x) })
702  sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
703  actual <- collectRDD(sortedRdd)
704  numPairs <- lapply(nums, function(x) { list (x, x) })
705  expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
706
707  rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
708  numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
709  sortedRdd2 <- sortByKey(numPairsRdd2)
710  actual <- collectRDD(sortedRdd2)
711  expect_equal(actual, numPairs)
712
713  # sort by string keys
714  l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
715  rdd3 <- parallelize(sc, l, 2L)
716  sortedRdd3 <- sortByKey(rdd3)
717  actual <- collectRDD(sortedRdd3)
718  expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
719
720  # test on the boundary cases
721
722  # boundary case 1: the RDD to be sorted has only 1 partition
723  rdd4 <- parallelize(sc, l, 1L)
724  sortedRdd4 <- sortByKey(rdd4)
725  actual <- collectRDD(sortedRdd4)
726  expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
727
728  # boundary case 2: the sorted RDD has only 1 partition
729  rdd5 <- parallelize(sc, l, 2L)
730  sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
731  actual <- collectRDD(sortedRdd5)
732  expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
733
734  # boundary case 3: the RDD to be sorted has only 1 element
735  l2 <- list(list("a", 1))
736  rdd6 <- parallelize(sc, l2, 2L)
737  sortedRdd6 <- sortByKey(rdd6)
738  actual <- collectRDD(sortedRdd6)
739  expect_equal(actual, l2)
740
741  # boundary case 4: the RDD to be sorted has 0 element
742  l3 <- list()
743  rdd7 <- parallelize(sc, l3, 2L)
744  sortedRdd7 <- sortByKey(rdd7)
745  actual <- collectRDD(sortedRdd7)
746  expect_equal(actual, l3)
747})
748
749test_that("collectAsMap() on a pairwise RDD", {
750  rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
751  vals <- collectAsMap(rdd)
752  expect_equal(vals, list(`1` = 2, `3` = 4))
753
754  rdd <- parallelize(sc, list(list("a", 1), list("b", 2)))
755  vals <- collectAsMap(rdd)
756  expect_equal(vals, list(a = 1, b = 2))
757
758  rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4)))
759  vals <- collectAsMap(rdd)
760  expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4))
761
762  rdd <- parallelize(sc, list(list(1, "a"), list(2, "b")))
763  vals <- collectAsMap(rdd)
764  expect_equal(vals, list(`1` = "a", `2` = "b"))
765})
766
767test_that("show()", {
768  rdd <- parallelize(sc, list(1:10))
769  expect_output(showRDD(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
770})
771
772test_that("sampleByKey() on pairwise RDDs", {
773  rdd <- parallelize(sc, 1:2000)
774  pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) })
775  fractions <- list(a = 0.2, b = 0.1)
776  sample <- sampleByKey(pairsRDD, FALSE, fractions, 1618L)
777  expect_equal(100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")), TRUE)
778  expect_equal(50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")), TRUE)
779  expect_equal(lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0, TRUE)
780  expect_equal(lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000, TRUE)
781  expect_equal(lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0, TRUE)
782  expect_equal(lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000, TRUE)
783
784  rdd <- parallelize(sc, 1:2000)
785  pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list(2, x) else list(3, x) })
786  fractions <- list(`2` = 0.2, `3` = 0.1)
787  sample <- sampleByKey(pairsRDD, TRUE, fractions, 1618L)
788  expect_equal(100 < length(lookup(sample, 2)) && 300 > length(lookup(sample, 2)), TRUE)
789  expect_equal(50 < length(lookup(sample, 3)) && 150 > length(lookup(sample, 3)), TRUE)
790  expect_equal(lookup(sample, 2)[which.min(lookup(sample, 2))] >= 0, TRUE)
791  expect_equal(lookup(sample, 2)[which.max(lookup(sample, 2))] <= 2000, TRUE)
792  expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE)
793  expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE)
794})
795
796test_that("Test correct concurrency of RRDD.compute()", {
797  rdd <- parallelize(sc, 1:1000, 100)
798  jrdd <- getJRDD(lapply(rdd, function(x) { x }), "row")
799  zrdd <- callJMethod(jrdd, "zip", jrdd)
800  count <- callJMethod(zrdd, "count")
801  expect_equal(count, 1000)
802})
803
804sparkR.session.stop()
805