1# 2# Licensed to the Apache Software Foundation (ASF) under one or more 3# contributor license agreements. See the NOTICE file distributed with 4# this work for additional information regarding copyright ownership. 5# The ASF licenses this file to You under the Apache License, Version 2.0 6# (the "License"); you may not use this file except in compliance with 7# the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18context("basic RDD functions") 19 20# JavaSparkContext handle 21sparkSession <- sparkR.session(enableHiveSupport = FALSE) 22sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) 23 24# Data 25nums <- 1:10 26rdd <- parallelize(sc, nums, 2L) 27 28intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200)) 29intRdd <- parallelize(sc, intPairs, 2L) 30 31test_that("get number of partitions in RDD", { 32 expect_equal(getNumPartitionsRDD(rdd), 2) 33 expect_equal(getNumPartitionsRDD(intRdd), 2) 34}) 35 36test_that("first on RDD", { 37 expect_equal(firstRDD(rdd), 1) 38 newrdd <- lapply(rdd, function(x) x + 1) 39 expect_equal(firstRDD(newrdd), 2) 40}) 41 42test_that("count and length on RDD", { 43 expect_equal(countRDD(rdd), 10) 44 expect_equal(lengthRDD(rdd), 10) 45}) 46 47test_that("count by values and keys", { 48 mods <- lapply(rdd, function(x) { x %% 3 }) 49 actual <- countByValue(mods) 50 expected <- list(list(0, 3L), list(1, 4L), list(2, 3L)) 51 expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) 52 53 actual <- countByKey(intRdd) 54 expected <- list(list(2L, 2L), list(1L, 2L)) 55 expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) 56}) 57 58test_that("lapply on RDD", { 59 multiples <- lapply(rdd, function(x) { 2 * x }) 60 actual <- collectRDD(multiples) 61 expect_equal(actual, as.list(nums * 2)) 62}) 63 64test_that("lapplyPartition on RDD", { 65 sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) }) 66 actual <- collectRDD(sums) 67 expect_equal(actual, list(15, 40)) 68}) 69 70test_that("mapPartitions on RDD", { 71 sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) }) 72 actual <- collectRDD(sums) 73 expect_equal(actual, list(15, 40)) 74}) 75 76test_that("flatMap() on RDDs", { 77 flat <- flatMap(intRdd, function(x) { list(x, x) }) 78 actual <- collectRDD(flat) 79 expect_equal(actual, rep(intPairs, each = 2)) 80}) 81 82test_that("filterRDD on RDD", { 83 filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 }) 84 actual <- collectRDD(filtered.rdd) 85 expect_equal(actual, list(2, 4, 6, 8, 10)) 86 87 filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd) 88 actual <- collectRDD(filtered.rdd) 89 expect_equal(actual, list(list(1L, -1))) 90 91 # Filter out all elements. 92 filtered.rdd <- filterRDD(rdd, function(x) { x > 10 }) 93 actual <- collectRDD(filtered.rdd) 94 expect_equal(actual, list()) 95}) 96 97test_that("lookup on RDD", { 98 vals <- lookup(intRdd, 1L) 99 expect_equal(vals, list(-1, 200)) 100 101 vals <- lookup(intRdd, 3L) 102 expect_equal(vals, list()) 103}) 104 105test_that("several transformations on RDD (a benchmark on PipelinedRDD)", { 106 rdd2 <- rdd 107 for (i in 1:12) 108 rdd2 <- lapplyPartitionsWithIndex( 109 rdd2, function(partIndex, part) { 110 part <- as.list(unlist(part) * partIndex + i) 111 }) 112 rdd2 <- lapply(rdd2, function(x) x + x) 113 actual <- collectRDD(rdd2) 114 expected <- list(24, 24, 24, 24, 24, 115 168, 170, 172, 174, 176) 116 expect_equal(actual, expected) 117}) 118 119test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", { 120 # RDD 121 rdd2 <- rdd 122 # PipelinedRDD 123 rdd2 <- lapplyPartitionsWithIndex( 124 rdd2, 125 function(partIndex, part) { 126 part <- as.list(unlist(part) * partIndex) 127 }) 128 129 cacheRDD(rdd2) 130 expect_true(rdd2@env$isCached) 131 rdd2 <- lapply(rdd2, function(x) x) 132 expect_false(rdd2@env$isCached) 133 134 unpersistRDD(rdd2) 135 expect_false(rdd2@env$isCached) 136 137 persistRDD(rdd2, "MEMORY_AND_DISK") 138 expect_true(rdd2@env$isCached) 139 rdd2 <- lapply(rdd2, function(x) x) 140 expect_false(rdd2@env$isCached) 141 142 unpersistRDD(rdd2) 143 expect_false(rdd2@env$isCached) 144 145 tempDir <- tempfile(pattern = "checkpoint") 146 setCheckpointDir(sc, tempDir) 147 checkpoint(rdd2) 148 expect_true(rdd2@env$isCheckpointed) 149 150 rdd2 <- lapply(rdd2, function(x) x) 151 expect_false(rdd2@env$isCached) 152 expect_false(rdd2@env$isCheckpointed) 153 154 # make sure the data is collectable 155 collectRDD(rdd2) 156 157 unlink(tempDir) 158}) 159 160test_that("reduce on RDD", { 161 sum <- reduce(rdd, "+") 162 expect_equal(sum, 55) 163 164 # Also test with an inline function 165 sumInline <- reduce(rdd, function(x, y) { x + y }) 166 expect_equal(sumInline, 55) 167}) 168 169test_that("lapply with dependency", { 170 fa <- 5 171 multiples <- lapply(rdd, function(x) { fa * x }) 172 actual <- collectRDD(multiples) 173 174 expect_equal(actual, as.list(nums * 5)) 175}) 176 177test_that("lapplyPartitionsWithIndex on RDDs", { 178 func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) } 179 actual <- collectRDD(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE) 180 expect_equal(actual, list(list(0, 15), list(1, 40))) 181 182 pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L) 183 partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 } 184 mkTup <- function(partIndex, part) { list(partIndex, part) } 185 actual <- collectRDD(lapplyPartitionsWithIndex( 186 partitionByRDD(pairsRDD, 2L, partitionByParity), 187 mkTup), 188 FALSE) 189 expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))), 190 list(1, list(list(4, 8))))) 191}) 192 193test_that("sampleRDD() on RDDs", { 194 expect_equal(unlist(collectRDD(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums) 195}) 196 197test_that("takeSample() on RDDs", { 198 # ported from RDDSuite.scala, modified seeds 199 data <- parallelize(sc, 1:100, 2L) 200 for (seed in 4:5) { 201 s <- takeSample(data, FALSE, 20L, seed) 202 expect_equal(length(s), 20L) 203 expect_equal(length(unique(s)), 20L) 204 for (elem in s) { 205 expect_true(elem >= 1 && elem <= 100) 206 } 207 } 208 for (seed in 4:5) { 209 s <- takeSample(data, FALSE, 200L, seed) 210 expect_equal(length(s), 100L) 211 expect_equal(length(unique(s)), 100L) 212 for (elem in s) { 213 expect_true(elem >= 1 && elem <= 100) 214 } 215 } 216 for (seed in 4:5) { 217 s <- takeSample(data, TRUE, 20L, seed) 218 expect_equal(length(s), 20L) 219 for (elem in s) { 220 expect_true(elem >= 1 && elem <= 100) 221 } 222 } 223 for (seed in 4:5) { 224 s <- takeSample(data, TRUE, 100L, seed) 225 expect_equal(length(s), 100L) 226 # Chance of getting all distinct elements is astronomically low, so test we 227 # got less than 100 228 expect_true(length(unique(s)) < 100L) 229 } 230 for (seed in 4:5) { 231 s <- takeSample(data, TRUE, 200L, seed) 232 expect_equal(length(s), 200L) 233 # Chance of getting all distinct elements is still quite low, so test we 234 # got less than 100 235 expect_true(length(unique(s)) < 100L) 236 } 237}) 238 239test_that("mapValues() on pairwise RDDs", { 240 multiples <- mapValues(intRdd, function(x) { x * 2 }) 241 actual <- collectRDD(multiples) 242 expected <- lapply(intPairs, function(x) { 243 list(x[[1]], x[[2]] * 2) 244 }) 245 expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) 246}) 247 248test_that("flatMapValues() on pairwise RDDs", { 249 l <- parallelize(sc, list(list(1, c(1, 2)), list(2, c(3, 4)))) 250 actual <- collectRDD(flatMapValues(l, function(x) { x })) 251 expect_equal(actual, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) 252 253 # Generate x to x+1 for every value 254 actual <- collectRDD(flatMapValues(intRdd, function(x) { x: (x + 1) })) 255 expect_equal(actual, 256 list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101), 257 list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201))) 258}) 259 260test_that("reduceByKeyLocally() on PairwiseRDDs", { 261 pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L) 262 actual <- reduceByKeyLocally(pairs, "+") 263 expect_equal(sortKeyValueList(actual), 264 sortKeyValueList(list(list(1, 6), list(1.1, 3)))) 265 266 pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3), 267 list("bb", 5)), 4L) 268 actual <- reduceByKeyLocally(pairs, "+") 269 expect_equal(sortKeyValueList(actual), 270 sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5)))) 271}) 272 273test_that("distinct() on RDDs", { 274 nums.rep2 <- rep(1:10, 2) 275 rdd.rep2 <- parallelize(sc, nums.rep2, 2L) 276 uniques <- distinctRDD(rdd.rep2) 277 actual <- sort(unlist(collectRDD(uniques))) 278 expect_equal(actual, nums) 279}) 280 281test_that("maximum() on RDDs", { 282 max <- maximum(rdd) 283 expect_equal(max, 10) 284}) 285 286test_that("minimum() on RDDs", { 287 min <- minimum(rdd) 288 expect_equal(min, 1) 289}) 290 291test_that("sumRDD() on RDDs", { 292 sum <- sumRDD(rdd) 293 expect_equal(sum, 55) 294}) 295 296test_that("keyBy on RDDs", { 297 func <- function(x) { x * x } 298 keys <- keyBy(rdd, func) 299 actual <- collectRDD(keys) 300 expect_equal(actual, lapply(nums, function(x) { list(func(x), x) })) 301}) 302 303test_that("repartition/coalesce on RDDs", { 304 rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements 305 306 # repartition 307 r1 <- repartitionRDD(rdd, 2) 308 expect_equal(getNumPartitionsRDD(r1), 2L) 309 count <- length(collectPartition(r1, 0L)) 310 expect_true(count >= 8 && count <= 12) 311 312 r2 <- repartitionRDD(rdd, 6) 313 expect_equal(getNumPartitionsRDD(r2), 6L) 314 count <- length(collectPartition(r2, 0L)) 315 expect_true(count >= 0 && count <= 4) 316 317 # coalesce 318 r3 <- coalesceRDD(rdd, 1) 319 expect_equal(getNumPartitionsRDD(r3), 1L) 320 count <- length(collectPartition(r3, 0L)) 321 expect_equal(count, 20) 322}) 323 324test_that("sortBy() on RDDs", { 325 sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE) 326 actual <- collectRDD(sortedRdd) 327 expect_equal(actual, as.list(sort(nums, decreasing = TRUE))) 328 329 rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L) 330 sortedRdd2 <- sortBy(rdd2, function(x) { x * x }) 331 actual <- collectRDD(sortedRdd2) 332 expect_equal(actual, as.list(nums)) 333}) 334 335test_that("takeOrdered() on RDDs", { 336 l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7) 337 rdd <- parallelize(sc, l) 338 actual <- takeOrdered(rdd, 6L) 339 expect_equal(actual, as.list(sort(unlist(l)))[1:6]) 340 341 l <- list("e", "d", "c", "d", "a") 342 rdd <- parallelize(sc, l) 343 actual <- takeOrdered(rdd, 3L) 344 expect_equal(actual, as.list(sort(unlist(l)))[1:3]) 345}) 346 347test_that("top() on RDDs", { 348 l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7) 349 rdd <- parallelize(sc, l) 350 actual <- top(rdd, 6L) 351 expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6]) 352 353 l <- list("e", "d", "c", "d", "a") 354 rdd <- parallelize(sc, l) 355 actual <- top(rdd, 3L) 356 expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3]) 357}) 358 359test_that("fold() on RDDs", { 360 actual <- fold(rdd, 0, "+") 361 expect_equal(actual, Reduce("+", nums, 0)) 362 363 rdd <- parallelize(sc, list()) 364 actual <- fold(rdd, 0, "+") 365 expect_equal(actual, 0) 366}) 367 368test_that("aggregateRDD() on RDDs", { 369 rdd <- parallelize(sc, list(1, 2, 3, 4)) 370 zeroValue <- list(0, 0) 371 seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } 372 combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } 373 actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp) 374 expect_equal(actual, list(10, 4)) 375 376 rdd <- parallelize(sc, list()) 377 actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp) 378 expect_equal(actual, list(0, 0)) 379}) 380 381test_that("zipWithUniqueId() on RDDs", { 382 rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) 383 actual <- collectRDD(zipWithUniqueId(rdd)) 384 expected <- list(list("a", 0), list("b", 1), list("c", 4), 385 list("d", 2), list("e", 5)) 386 expect_equal(actual, expected) 387 388 rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L) 389 actual <- collectRDD(zipWithUniqueId(rdd)) 390 expected <- list(list("a", 0), list("b", 1), list("c", 2), 391 list("d", 3), list("e", 4)) 392 expect_equal(actual, expected) 393}) 394 395test_that("zipWithIndex() on RDDs", { 396 rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) 397 actual <- collectRDD(zipWithIndex(rdd)) 398 expected <- list(list("a", 0), list("b", 1), list("c", 2), 399 list("d", 3), list("e", 4)) 400 expect_equal(actual, expected) 401 402 rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L) 403 actual <- collectRDD(zipWithIndex(rdd)) 404 expected <- list(list("a", 0), list("b", 1), list("c", 2), 405 list("d", 3), list("e", 4)) 406 expect_equal(actual, expected) 407}) 408 409test_that("glom() on RDD", { 410 rdd <- parallelize(sc, as.list(1:4), 2L) 411 actual <- collectRDD(glom(rdd)) 412 expect_equal(actual, list(list(1, 2), list(3, 4))) 413}) 414 415test_that("keys() on RDDs", { 416 keys <- keys(intRdd) 417 actual <- collectRDD(keys) 418 expect_equal(actual, lapply(intPairs, function(x) { x[[1]] })) 419}) 420 421test_that("values() on RDDs", { 422 values <- values(intRdd) 423 actual <- collectRDD(values) 424 expect_equal(actual, lapply(intPairs, function(x) { x[[2]] })) 425}) 426 427test_that("pipeRDD() on RDDs", { 428 actual <- collectRDD(pipeRDD(rdd, "more")) 429 expected <- as.list(as.character(1:10)) 430 expect_equal(actual, expected) 431 432 trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n")) 433 actual <- collectRDD(pipeRDD(trailed.rdd, "sort")) 434 expected <- list("", "1", "2", "3") 435 expect_equal(actual, expected) 436 437 rev.nums <- 9:0 438 rev.rdd <- parallelize(sc, rev.nums, 2L) 439 actual <- collectRDD(pipeRDD(rev.rdd, "sort")) 440 expected <- as.list(as.character(c(5:9, 0:4))) 441 expect_equal(actual, expected) 442}) 443 444test_that("zipRDD() on RDDs", { 445 rdd1 <- parallelize(sc, 0:4, 2) 446 rdd2 <- parallelize(sc, 1000:1004, 2) 447 actual <- collectRDD(zipRDD(rdd1, rdd2)) 448 expect_equal(actual, 449 list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))) 450 451 mockFile <- c("Spark is pretty.", "Spark is awesome.") 452 fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") 453 writeLines(mockFile, fileName) 454 455 rdd <- textFile(sc, fileName, 1) 456 actual <- collectRDD(zipRDD(rdd, rdd)) 457 expected <- lapply(mockFile, function(x) { list(x, x) }) 458 expect_equal(actual, expected) 459 460 rdd1 <- parallelize(sc, 0:1, 1) 461 actual <- collectRDD(zipRDD(rdd1, rdd)) 462 expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) }) 463 expect_equal(actual, expected) 464 465 rdd1 <- map(rdd, function(x) { x }) 466 actual <- collectRDD(zipRDD(rdd, rdd1)) 467 expected <- lapply(mockFile, function(x) { list(x, x) }) 468 expect_equal(actual, expected) 469 470 unlink(fileName) 471}) 472 473test_that("cartesian() on RDDs", { 474 rdd <- parallelize(sc, 1:3) 475 actual <- collectRDD(cartesian(rdd, rdd)) 476 expect_equal(sortKeyValueList(actual), 477 list( 478 list(1, 1), list(1, 2), list(1, 3), 479 list(2, 1), list(2, 2), list(2, 3), 480 list(3, 1), list(3, 2), list(3, 3))) 481 482 # test case where one RDD is empty 483 emptyRdd <- parallelize(sc, list()) 484 actual <- collectRDD(cartesian(rdd, emptyRdd)) 485 expect_equal(actual, list()) 486 487 mockFile <- c("Spark is pretty.", "Spark is awesome.") 488 fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") 489 writeLines(mockFile, fileName) 490 491 rdd <- textFile(sc, fileName) 492 actual <- collectRDD(cartesian(rdd, rdd)) 493 expected <- list( 494 list("Spark is awesome.", "Spark is pretty."), 495 list("Spark is awesome.", "Spark is awesome."), 496 list("Spark is pretty.", "Spark is pretty."), 497 list("Spark is pretty.", "Spark is awesome.")) 498 expect_equal(sortKeyValueList(actual), expected) 499 500 rdd1 <- parallelize(sc, 0:1) 501 actual <- collectRDD(cartesian(rdd1, rdd)) 502 expect_equal(sortKeyValueList(actual), 503 list( 504 list(0, "Spark is pretty."), 505 list(0, "Spark is awesome."), 506 list(1, "Spark is pretty."), 507 list(1, "Spark is awesome."))) 508 509 rdd1 <- map(rdd, function(x) { x }) 510 actual <- collectRDD(cartesian(rdd, rdd1)) 511 expect_equal(sortKeyValueList(actual), expected) 512 513 unlink(fileName) 514}) 515 516test_that("subtract() on RDDs", { 517 l <- list(1, 1, 2, 2, 3, 4) 518 rdd1 <- parallelize(sc, l) 519 520 # subtract by itself 521 actual <- collectRDD(subtract(rdd1, rdd1)) 522 expect_equal(actual, list()) 523 524 # subtract by an empty RDD 525 rdd2 <- parallelize(sc, list()) 526 actual <- collectRDD(subtract(rdd1, rdd2)) 527 expect_equal(as.list(sort(as.vector(actual, mode = "integer"))), 528 l) 529 530 rdd2 <- parallelize(sc, list(2, 4)) 531 actual <- collectRDD(subtract(rdd1, rdd2)) 532 expect_equal(as.list(sort(as.vector(actual, mode = "integer"))), 533 list(1, 1, 3)) 534 535 l <- list("a", "a", "b", "b", "c", "d") 536 rdd1 <- parallelize(sc, l) 537 rdd2 <- parallelize(sc, list("b", "d")) 538 actual <- collectRDD(subtract(rdd1, rdd2)) 539 expect_equal(as.list(sort(as.vector(actual, mode = "character"))), 540 list("a", "a", "c")) 541}) 542 543test_that("subtractByKey() on pairwise RDDs", { 544 l <- list(list("a", 1), list("b", 4), 545 list("b", 5), list("a", 2)) 546 rdd1 <- parallelize(sc, l) 547 548 # subtractByKey by itself 549 actual <- collectRDD(subtractByKey(rdd1, rdd1)) 550 expect_equal(actual, list()) 551 552 # subtractByKey by an empty RDD 553 rdd2 <- parallelize(sc, list()) 554 actual <- collectRDD(subtractByKey(rdd1, rdd2)) 555 expect_equal(sortKeyValueList(actual), 556 sortKeyValueList(l)) 557 558 rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1))) 559 actual <- collectRDD(subtractByKey(rdd1, rdd2)) 560 expect_equal(actual, 561 list(list("b", 4), list("b", 5))) 562 563 l <- list(list(1, 1), list(2, 4), 564 list(2, 5), list(1, 2)) 565 rdd1 <- parallelize(sc, l) 566 rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1))) 567 actual <- collectRDD(subtractByKey(rdd1, rdd2)) 568 expect_equal(actual, 569 list(list(2, 4), list(2, 5))) 570}) 571 572test_that("intersection() on RDDs", { 573 # intersection with self 574 actual <- collectRDD(intersection(rdd, rdd)) 575 expect_equal(sort(as.integer(actual)), nums) 576 577 # intersection with an empty RDD 578 emptyRdd <- parallelize(sc, list()) 579 actual <- collectRDD(intersection(rdd, emptyRdd)) 580 expect_equal(actual, list()) 581 582 rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5)) 583 rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8)) 584 actual <- collectRDD(intersection(rdd1, rdd2)) 585 expect_equal(sort(as.integer(actual)), 1:3) 586}) 587 588test_that("join() on pairwise RDDs", { 589 rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) 590 rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) 591 actual <- collectRDD(joinRDD(rdd1, rdd2, 2L)) 592 expect_equal(sortKeyValueList(actual), 593 sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3))))) 594 595 rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4))) 596 rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3))) 597 actual <- collectRDD(joinRDD(rdd1, rdd2, 2L)) 598 expect_equal(sortKeyValueList(actual), 599 sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3))))) 600 601 rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) 602 rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) 603 actual <- collectRDD(joinRDD(rdd1, rdd2, 2L)) 604 expect_equal(actual, list()) 605 606 rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) 607 rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) 608 actual <- collectRDD(joinRDD(rdd1, rdd2, 2L)) 609 expect_equal(actual, list()) 610}) 611 612test_that("leftOuterJoin() on pairwise RDDs", { 613 rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) 614 rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) 615 actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L)) 616 expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL))) 617 expect_equal(sortKeyValueList(actual), 618 sortKeyValueList(expected)) 619 620 rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4))) 621 rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3))) 622 actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L)) 623 expected <- list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3))) 624 expect_equal(sortKeyValueList(actual), 625 sortKeyValueList(expected)) 626 627 rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) 628 rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) 629 actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L)) 630 expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL))) 631 expect_equal(sortKeyValueList(actual), 632 sortKeyValueList(expected)) 633 634 rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) 635 rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) 636 actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L)) 637 expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL))) 638 expect_equal(sortKeyValueList(actual), 639 sortKeyValueList(expected)) 640}) 641 642test_that("rightOuterJoin() on pairwise RDDs", { 643 rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3))) 644 rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) 645 actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L)) 646 expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4))) 647 expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) 648 649 rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3))) 650 rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4))) 651 actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L)) 652 expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1))) 653 expect_equal(sortKeyValueList(actual), 654 sortKeyValueList(expected)) 655 656 rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) 657 rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) 658 actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L)) 659 expect_equal(sortKeyValueList(actual), 660 sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4))))) 661 662 rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) 663 rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) 664 actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L)) 665 expect_equal(sortKeyValueList(actual), 666 sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3))))) 667}) 668 669test_that("fullOuterJoin() on pairwise RDDs", { 670 rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3))) 671 rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) 672 actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L)) 673 expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), 674 list(2, list(NULL, 4)), list(3, list(3, NULL))) 675 expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) 676 677 rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3), list("c", 1))) 678 rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4))) 679 actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L)) 680 expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), 681 list("a", list(3, 1)), list("c", list(1, NULL))) 682 expect_equal(sortKeyValueList(actual), 683 sortKeyValueList(expected)) 684 685 rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) 686 rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) 687 actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L)) 688 expect_equal(sortKeyValueList(actual), 689 sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), 690 list(3, list(NULL, 3)), list(4, list(NULL, 4))))) 691 692 rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) 693 rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) 694 actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L)) 695 expect_equal(sortKeyValueList(actual), 696 sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), 697 list("d", list(NULL, 4)), list("c", list(NULL, 3))))) 698}) 699 700test_that("sortByKey() on pairwise RDDs", { 701 numPairsRdd <- map(rdd, function(x) { list (x, x) }) 702 sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE) 703 actual <- collectRDD(sortedRdd) 704 numPairs <- lapply(nums, function(x) { list (x, x) }) 705 expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE)) 706 707 rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L) 708 numPairsRdd2 <- map(rdd2, function(x) { list (x, x) }) 709 sortedRdd2 <- sortByKey(numPairsRdd2) 710 actual <- collectRDD(sortedRdd2) 711 expect_equal(actual, numPairs) 712 713 # sort by string keys 714 l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5)) 715 rdd3 <- parallelize(sc, l, 2L) 716 sortedRdd3 <- sortByKey(rdd3) 717 actual <- collectRDD(sortedRdd3) 718 expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) 719 720 # test on the boundary cases 721 722 # boundary case 1: the RDD to be sorted has only 1 partition 723 rdd4 <- parallelize(sc, l, 1L) 724 sortedRdd4 <- sortByKey(rdd4) 725 actual <- collectRDD(sortedRdd4) 726 expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) 727 728 # boundary case 2: the sorted RDD has only 1 partition 729 rdd5 <- parallelize(sc, l, 2L) 730 sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L) 731 actual <- collectRDD(sortedRdd5) 732 expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) 733 734 # boundary case 3: the RDD to be sorted has only 1 element 735 l2 <- list(list("a", 1)) 736 rdd6 <- parallelize(sc, l2, 2L) 737 sortedRdd6 <- sortByKey(rdd6) 738 actual <- collectRDD(sortedRdd6) 739 expect_equal(actual, l2) 740 741 # boundary case 4: the RDD to be sorted has 0 element 742 l3 <- list() 743 rdd7 <- parallelize(sc, l3, 2L) 744 sortedRdd7 <- sortByKey(rdd7) 745 actual <- collectRDD(sortedRdd7) 746 expect_equal(actual, l3) 747}) 748 749test_that("collectAsMap() on a pairwise RDD", { 750 rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) 751 vals <- collectAsMap(rdd) 752 expect_equal(vals, list(`1` = 2, `3` = 4)) 753 754 rdd <- parallelize(sc, list(list("a", 1), list("b", 2))) 755 vals <- collectAsMap(rdd) 756 expect_equal(vals, list(a = 1, b = 2)) 757 758 rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4))) 759 vals <- collectAsMap(rdd) 760 expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4)) 761 762 rdd <- parallelize(sc, list(list(1, "a"), list(2, "b"))) 763 vals <- collectAsMap(rdd) 764 expect_equal(vals, list(`1` = "a", `2` = "b")) 765}) 766 767test_that("show()", { 768 rdd <- parallelize(sc, list(1:10)) 769 expect_output(showRDD(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+") 770}) 771 772test_that("sampleByKey() on pairwise RDDs", { 773 rdd <- parallelize(sc, 1:2000) 774 pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) }) 775 fractions <- list(a = 0.2, b = 0.1) 776 sample <- sampleByKey(pairsRDD, FALSE, fractions, 1618L) 777 expect_equal(100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")), TRUE) 778 expect_equal(50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")), TRUE) 779 expect_equal(lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0, TRUE) 780 expect_equal(lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000, TRUE) 781 expect_equal(lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0, TRUE) 782 expect_equal(lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000, TRUE) 783 784 rdd <- parallelize(sc, 1:2000) 785 pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list(2, x) else list(3, x) }) 786 fractions <- list(`2` = 0.2, `3` = 0.1) 787 sample <- sampleByKey(pairsRDD, TRUE, fractions, 1618L) 788 expect_equal(100 < length(lookup(sample, 2)) && 300 > length(lookup(sample, 2)), TRUE) 789 expect_equal(50 < length(lookup(sample, 3)) && 150 > length(lookup(sample, 3)), TRUE) 790 expect_equal(lookup(sample, 2)[which.min(lookup(sample, 2))] >= 0, TRUE) 791 expect_equal(lookup(sample, 2)[which.max(lookup(sample, 2))] <= 2000, TRUE) 792 expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE) 793 expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE) 794}) 795 796test_that("Test correct concurrency of RRDD.compute()", { 797 rdd <- parallelize(sc, 1:1000, 100) 798 jrdd <- getJRDD(lapply(rdd, function(x) { x }), "row") 799 zrdd <- callJMethod(jrdd, "zip", jrdd) 800 count <- callJMethod(zrdd, "count") 801 expect_equal(count, 1000) 802}) 803 804sparkR.session.stop() 805