1# 2# Licensed to the Apache Software Foundation (ASF) under one or more 3# contributor license agreements. See the NOTICE file distributed with 4# this work for additional information regarding copyright ownership. 5# The ASF licenses this file to You under the Apache License, Version 2.0 6# (the "License"); you may not use this file except in compliance with 7# the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18library(testthat) 19 20context("SparkSQL functions") 21 22# Utility function for easily checking the values of a StructField 23checkStructField <- function(actual, expectedName, expectedType, expectedNullable) { 24 expect_equal(class(actual), "structField") 25 expect_equal(actual$name(), expectedName) 26 expect_equal(actual$dataType.toString(), expectedType) 27 expect_equal(actual$nullable(), expectedNullable) 28} 29 30markUtf8 <- function(s) { 31 Encoding(s) <- "UTF-8" 32 s 33} 34 35setHiveContext <- function(sc) { 36 if (exists(".testHiveSession", envir = .sparkREnv)) { 37 hiveSession <- get(".testHiveSession", envir = .sparkREnv) 38 } else { 39 # initialize once and reuse 40 ssc <- callJMethod(sc, "sc") 41 hiveCtx <- tryCatch({ 42 newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE) 43 }, 44 error = function(err) { 45 skip("Hive is not build with SparkSQL, skipped") 46 }) 47 hiveSession <- callJMethod(hiveCtx, "sparkSession") 48 } 49 previousSession <- get(".sparkRsession", envir = .sparkREnv) 50 assign(".sparkRsession", hiveSession, envir = .sparkREnv) 51 assign(".prevSparkRsession", previousSession, envir = .sparkREnv) 52 hiveSession 53} 54 55unsetHiveContext <- function() { 56 previousSession <- get(".prevSparkRsession", envir = .sparkREnv) 57 assign(".sparkRsession", previousSession, envir = .sparkREnv) 58 remove(".prevSparkRsession", envir = .sparkREnv) 59} 60 61# Tests for SparkSQL functions in SparkR 62 63filesBefore <- list.files(path = sparkRDir, all.files = TRUE) 64sparkSession <- sparkR.session() 65sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) 66 67mockLines <- c("{\"name\":\"Michael\"}", 68 "{\"name\":\"Andy\", \"age\":30}", 69 "{\"name\":\"Justin\", \"age\":19}") 70jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 71parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") 72orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc") 73writeLines(mockLines, jsonPath) 74 75# For test nafunctions, like dropna(), fillna(),... 76mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", 77 "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", 78 "{\"name\":\"David\",\"age\":60,\"height\":null}", 79 "{\"name\":\"Amy\",\"age\":null,\"height\":null}", 80 "{\"name\":null,\"age\":null,\"height\":null}") 81jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 82writeLines(mockLinesNa, jsonPathNa) 83 84# For test complex types in DataFrame 85mockLinesComplexType <- 86 c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", 87 "{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", 88 "{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") 89complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 90writeLines(mockLinesComplexType, complexTypeJsonPath) 91 92test_that("calling sparkRSQL.init returns existing SQL context", { 93 sqlContext <- suppressWarnings(sparkRSQL.init(sc)) 94 expect_equal(suppressWarnings(sparkRSQL.init(sc)), sqlContext) 95}) 96 97test_that("calling sparkRSQL.init returns existing SparkSession", { 98 expect_equal(suppressWarnings(sparkRSQL.init(sc)), sparkSession) 99}) 100 101test_that("calling sparkR.session returns existing SparkSession", { 102 expect_equal(sparkR.session(), sparkSession) 103}) 104 105test_that("infer types and check types", { 106 expect_equal(infer_type(1L), "integer") 107 expect_equal(infer_type(1.0), "double") 108 expect_equal(infer_type("abc"), "string") 109 expect_equal(infer_type(TRUE), "boolean") 110 expect_equal(infer_type(as.Date("2015-03-11")), "date") 111 expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp") 112 expect_equal(infer_type(c(1L, 2L)), "array<integer>") 113 expect_equal(infer_type(list(1L, 2L)), "array<integer>") 114 expect_equal(infer_type(listToStruct(list(a = 1L, b = "2"))), "struct<a:integer,b:string>") 115 e <- new.env() 116 assign("a", 1L, envir = e) 117 expect_equal(infer_type(e), "map<string,integer>") 118 119 expect_error(checkType("map<integer,integer>"), "Key type in a map must be string or character") 120 121 expect_equal(infer_type(as.raw(c(1, 2, 3))), "binary") 122}) 123 124test_that("structType and structField", { 125 testField <- structField("a", "string") 126 expect_is(testField, "structField") 127 expect_equal(testField$name(), "a") 128 expect_true(testField$nullable()) 129 130 testSchema <- structType(testField, structField("b", "integer")) 131 expect_is(testSchema, "structType") 132 expect_is(testSchema$fields()[[2]], "structField") 133 expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType") 134}) 135 136test_that("create DataFrame from RDD", { 137 rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) 138 df <- createDataFrame(rdd, list("a", "b")) 139 dfAsDF <- as.DataFrame(rdd, list("a", "b")) 140 expect_is(df, "SparkDataFrame") 141 expect_is(dfAsDF, "SparkDataFrame") 142 expect_equal(count(df), 10) 143 expect_equal(count(dfAsDF), 10) 144 expect_equal(nrow(df), 10) 145 expect_equal(nrow(dfAsDF), 10) 146 expect_equal(ncol(df), 2) 147 expect_equal(ncol(dfAsDF), 2) 148 expect_equal(dim(df), c(10, 2)) 149 expect_equal(dim(dfAsDF), c(10, 2)) 150 expect_equal(columns(df), c("a", "b")) 151 expect_equal(columns(dfAsDF), c("a", "b")) 152 expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) 153 expect_equal(dtypes(dfAsDF), list(c("a", "int"), c("b", "string"))) 154 155 df <- createDataFrame(rdd) 156 dfAsDF <- as.DataFrame(rdd) 157 expect_is(df, "SparkDataFrame") 158 expect_is(dfAsDF, "SparkDataFrame") 159 expect_equal(columns(df), c("_1", "_2")) 160 expect_equal(columns(dfAsDF), c("_1", "_2")) 161 162 schema <- structType(structField(x = "a", type = "integer", nullable = TRUE), 163 structField(x = "b", type = "string", nullable = TRUE)) 164 df <- createDataFrame(rdd, schema) 165 expect_is(df, "SparkDataFrame") 166 expect_equal(columns(df), c("a", "b")) 167 expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) 168 169 rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) }) 170 df <- createDataFrame(rdd) 171 expect_is(df, "SparkDataFrame") 172 expect_equal(count(df), 10) 173 expect_equal(columns(df), c("a", "b")) 174 expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) 175 176 schema <- structType(structField("name", "string"), structField("age", "integer"), 177 structField("height", "float")) 178 df <- read.df(jsonPathNa, "json", schema) 179 df2 <- createDataFrame(toRDD(df), schema) 180 df2AsDF <- as.DataFrame(toRDD(df), schema) 181 expect_equal(columns(df2), c("name", "age", "height")) 182 expect_equal(columns(df2AsDF), c("name", "age", "height")) 183 expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float"))) 184 expect_equal(dtypes(df2AsDF), list(c("name", "string"), c("age", "int"), c("height", "float"))) 185 expect_equal(as.list(collect(where(df2, df2$name == "Bob"))), 186 list(name = "Bob", age = 16, height = 176.5)) 187 expect_equal(as.list(collect(where(df2AsDF, df2AsDF$name == "Bob"))), 188 list(name = "Bob", age = 16, height = 176.5)) 189 190 localDF <- data.frame(name = c("John", "Smith", "Sarah"), 191 age = c(19L, 23L, 18L), 192 height = c(176.5, 181.4, 173.7)) 193 df <- createDataFrame(localDF, schema) 194 expect_is(df, "SparkDataFrame") 195 expect_equal(count(df), 3) 196 expect_equal(columns(df), c("name", "age", "height")) 197 expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float"))) 198 expect_equal(as.list(collect(where(df, df$name == "John"))), 199 list(name = "John", age = 19L, height = 176.5)) 200 expect_equal(getNumPartitions(df), 1) 201 202 df <- as.DataFrame(cars, numPartitions = 2) 203 expect_equal(getNumPartitions(df), 2) 204 df <- createDataFrame(cars, numPartitions = 3) 205 expect_equal(getNumPartitions(df), 3) 206 # validate limit by num of rows 207 df <- createDataFrame(cars, numPartitions = 60) 208 expect_equal(getNumPartitions(df), 50) 209 # validate when 1 < (length(coll) / numSlices) << length(coll) 210 df <- createDataFrame(cars, numPartitions = 20) 211 expect_equal(getNumPartitions(df), 20) 212 213 df <- as.DataFrame(data.frame(0)) 214 expect_is(df, "SparkDataFrame") 215 df <- createDataFrame(list(list(1))) 216 expect_is(df, "SparkDataFrame") 217 df <- as.DataFrame(data.frame(0), numPartitions = 2) 218 # no data to partition, goes to 1 219 expect_equal(getNumPartitions(df), 1) 220 221 setHiveContext(sc) 222 sql("CREATE TABLE people (name string, age double, height float)") 223 df <- read.df(jsonPathNa, "json", schema) 224 invisible(insertInto(df, "people")) 225 expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, 226 c(16)) 227 expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height, 228 c(176.5)) 229 sql("DROP TABLE people") 230 unsetHiveContext() 231}) 232 233test_that("createDataFrame uses files for large objects", { 234 # To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value 235 conf <- callJMethod(sparkSession, "conf") 236 callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100") 237 df <- suppressWarnings(createDataFrame(iris, numPartitions = 3)) 238 expect_equal(getNumPartitions(df), 3) 239 240 # Resetting the conf back to default value 241 callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10)) 242 expect_equal(dim(df), dim(iris)) 243}) 244 245test_that("read/write csv as DataFrame", { 246 csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv") 247 mockLinesCsv <- c("year,make,model,comment,blank", 248 "\"2012\",\"Tesla\",\"S\",\"No comment\",", 249 "1997,Ford,E350,\"Go get one now they are going fast\",", 250 "2015,Chevy,Volt", 251 "NA,Dummy,Placeholder") 252 writeLines(mockLinesCsv, csvPath) 253 254 # default "header" is false, inferSchema to handle "year" as "int" 255 df <- read.df(csvPath, "csv", header = "true", inferSchema = "true") 256 expect_equal(count(df), 4) 257 expect_equal(columns(df), c("year", "make", "model", "comment", "blank")) 258 expect_equal(sort(unlist(collect(where(df, df$year == 2015)))), 259 sort(unlist(list(year = 2015, make = "Chevy", model = "Volt")))) 260 261 # since "year" is "int", let's skip the NA values 262 withoutna <- na.omit(df, how = "any", cols = "year") 263 expect_equal(count(withoutna), 3) 264 265 unlink(csvPath) 266 csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv") 267 mockLinesCsv <- c("year,make,model,comment,blank", 268 "\"2012\",\"Tesla\",\"S\",\"No comment\",", 269 "1997,Ford,E350,\"Go get one now they are going fast\",", 270 "2015,Chevy,Volt", 271 "Empty,Dummy,Placeholder") 272 writeLines(mockLinesCsv, csvPath) 273 274 df2 <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "Empty") 275 expect_equal(count(df2), 4) 276 withoutna2 <- na.omit(df2, how = "any", cols = "year") 277 expect_equal(count(withoutna2), 3) 278 expect_equal(count(where(withoutna2, withoutna2$make == "Dummy")), 0) 279 280 # writing csv file 281 csvPath2 <- tempfile(pattern = "csvtest2", fileext = ".csv") 282 write.df(df2, path = csvPath2, "csv", header = "true") 283 df3 <- read.df(csvPath2, "csv", header = "true") 284 expect_equal(nrow(df3), nrow(df2)) 285 expect_equal(colnames(df3), colnames(df2)) 286 csv <- read.csv(file = list.files(csvPath2, pattern = "^part", full.names = T)[[1]]) 287 expect_equal(colnames(df3), colnames(csv)) 288 289 unlink(csvPath) 290 unlink(csvPath2) 291}) 292 293test_that("Support other types for options", { 294 csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv") 295 mockLinesCsv <- c("year,make,model,comment,blank", 296 "\"2012\",\"Tesla\",\"S\",\"No comment\",", 297 "1997,Ford,E350,\"Go get one now they are going fast\",", 298 "2015,Chevy,Volt", 299 "NA,Dummy,Placeholder") 300 writeLines(mockLinesCsv, csvPath) 301 302 csvDf <- read.df(csvPath, "csv", header = "true", inferSchema = "true") 303 expected <- read.df(csvPath, "csv", header = TRUE, inferSchema = TRUE) 304 expect_equal(collect(csvDf), collect(expected)) 305 306 expect_error(read.df(csvPath, "csv", header = TRUE, maxColumns = 3)) 307 unlink(csvPath) 308}) 309 310test_that("convert NAs to null type in DataFrames", { 311 rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L))) 312 df <- createDataFrame(rdd, list("a", "b")) 313 expect_true(is.na(collect(df)[2, "a"])) 314 expect_equal(collect(df)[2, "b"], 4L) 315 316 l <- data.frame(x = 1L, y = c(1L, NA_integer_, 3L)) 317 df <- createDataFrame(l) 318 expect_equal(collect(df)[2, "x"], 1L) 319 expect_true(is.na(collect(df)[2, "y"])) 320 321 rdd <- parallelize(sc, list(list(1, 2), list(NA, 4))) 322 df <- createDataFrame(rdd, list("a", "b")) 323 expect_true(is.na(collect(df)[2, "a"])) 324 expect_equal(collect(df)[2, "b"], 4) 325 326 l <- data.frame(x = 1, y = c(1, NA_real_, 3)) 327 df <- createDataFrame(l) 328 expect_equal(collect(df)[2, "x"], 1) 329 expect_true(is.na(collect(df)[2, "y"])) 330 331 l <- list("a", "b", NA, "d") 332 df <- createDataFrame(l) 333 expect_true(is.na(collect(df)[3, "_1"])) 334 expect_equal(collect(df)[4, "_1"], "d") 335 336 l <- list("a", "b", NA_character_, "d") 337 df <- createDataFrame(l) 338 expect_true(is.na(collect(df)[3, "_1"])) 339 expect_equal(collect(df)[4, "_1"], "d") 340 341 l <- list(TRUE, FALSE, NA, TRUE) 342 df <- createDataFrame(l) 343 expect_true(is.na(collect(df)[3, "_1"])) 344 expect_equal(collect(df)[4, "_1"], TRUE) 345}) 346 347test_that("toDF", { 348 rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) 349 df <- toDF(rdd, list("a", "b")) 350 expect_is(df, "SparkDataFrame") 351 expect_equal(count(df), 10) 352 expect_equal(columns(df), c("a", "b")) 353 expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) 354 355 df <- toDF(rdd) 356 expect_is(df, "SparkDataFrame") 357 expect_equal(columns(df), c("_1", "_2")) 358 359 schema <- structType(structField(x = "a", type = "integer", nullable = TRUE), 360 structField(x = "b", type = "string", nullable = TRUE)) 361 df <- toDF(rdd, schema) 362 expect_is(df, "SparkDataFrame") 363 expect_equal(columns(df), c("a", "b")) 364 expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) 365 366 rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) }) 367 df <- toDF(rdd) 368 expect_is(df, "SparkDataFrame") 369 expect_equal(count(df), 10) 370 expect_equal(columns(df), c("a", "b")) 371 expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) 372}) 373 374test_that("create DataFrame from list or data.frame", { 375 l <- list(list(1, 2), list(3, 4)) 376 df <- createDataFrame(l, c("a", "b")) 377 expect_equal(columns(df), c("a", "b")) 378 379 l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) 380 df <- createDataFrame(l) 381 expect_equal(columns(df), c("a", "b")) 382 383 a <- 1:3 384 b <- c("a", "b", "c") 385 ldf <- data.frame(a, b) 386 df <- createDataFrame(ldf) 387 expect_equal(columns(df), c("a", "b")) 388 expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) 389 expect_equal(count(df), 3) 390 ldf2 <- collect(df) 391 expect_equal(ldf$a, ldf2$a) 392 393 irisdf <- suppressWarnings(createDataFrame(iris)) 394 iris_collected <- collect(irisdf) 395 expect_equivalent(iris_collected[, -5], iris[, -5]) 396 expect_equal(iris_collected$Species, as.character(iris$Species)) 397 398 mtcarsdf <- createDataFrame(mtcars) 399 expect_equivalent(collect(mtcarsdf), mtcars) 400 401 bytes <- as.raw(c(1, 2, 3)) 402 df <- createDataFrame(list(list(bytes))) 403 expect_equal(collect(df)[[1]][[1]], bytes) 404}) 405 406test_that("create DataFrame with different data types", { 407 l <- list(a = 1L, b = 2, c = TRUE, d = "ss", e = as.Date("2012-12-13"), 408 f = as.POSIXct("2015-03-15 12:13:14.056")) 409 df <- createDataFrame(list(l)) 410 expect_equal(dtypes(df), list(c("a", "int"), c("b", "double"), c("c", "boolean"), 411 c("d", "string"), c("e", "date"), c("f", "timestamp"))) 412 expect_equal(count(df), 1) 413 expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE)) 414}) 415 416test_that("SPARK-17811: can create DataFrame containing NA as date and time", { 417 df <- data.frame( 418 id = 1:2, 419 time = c(as.POSIXlt("2016-01-10"), NA), 420 date = c(as.Date("2016-10-01"), NA)) 421 422 DF <- collect(createDataFrame(df)) 423 expect_true(is.na(DF$date[2])) 424 expect_equal(DF$date[1], as.Date("2016-10-01")) 425 expect_true(is.na(DF$time[2])) 426 expect_equal(DF$time[1], as.POSIXlt("2016-01-10")) 427}) 428 429test_that("create DataFrame with complex types", { 430 e <- new.env() 431 assign("n", 3L, envir = e) 432 433 s <- listToStruct(list(a = "aa", b = 3L)) 434 435 l <- list(as.list(1:10), list("a", "b"), e, s) 436 df <- createDataFrame(list(l), c("a", "b", "c", "d")) 437 expect_equal(dtypes(df), list(c("a", "array<int>"), 438 c("b", "array<string>"), 439 c("c", "map<string,int>"), 440 c("d", "struct<a:string,b:int>"))) 441 expect_equal(count(df), 1) 442 ldf <- collect(df) 443 expect_equal(names(ldf), c("a", "b", "c", "d")) 444 expect_equal(ldf[1, 1][[1]], l[[1]]) 445 expect_equal(ldf[1, 2][[1]], l[[2]]) 446 447 e <- ldf$c[[1]] 448 expect_equal(class(e), "environment") 449 expect_equal(ls(e), "n") 450 expect_equal(e$n, 3L) 451 452 s <- ldf$d[[1]] 453 expect_equal(class(s), "struct") 454 expect_equal(s$a, "aa") 455 expect_equal(s$b, 3L) 456}) 457 458test_that("create DataFrame from a data.frame with complex types", { 459 ldf <- data.frame(row.names = 1:2) 460 ldf$a_list <- list(list(1, 2), list(3, 4)) 461 ldf$an_envir <- c(as.environment(list(a = 1, b = 2)), as.environment(list(c = 3))) 462 463 sdf <- createDataFrame(ldf) 464 collected <- collect(sdf) 465 466 expect_identical(ldf[, 1, FALSE], collected[, 1, FALSE]) 467 expect_equal(ldf$an_envir, collected$an_envir) 468}) 469 470# For test map type and struct type in DataFrame 471mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", 472 "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}", 473 "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}") 474mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 475writeLines(mockLinesMapType, mapTypeJsonPath) 476 477test_that("Collect DataFrame with complex types", { 478 # ArrayType 479 df <- read.json(complexTypeJsonPath) 480 ldf <- collect(df) 481 expect_equal(nrow(ldf), 3) 482 expect_equal(ncol(ldf), 3) 483 expect_equal(names(ldf), c("c1", "c2", "c3")) 484 expect_equal(ldf$c1, list(list(1, 2, 3), list(4, 5, 6), list (7, 8, 9))) 485 expect_equal(ldf$c2, list(list("a", "b", "c"), list("d", "e", "f"), list ("g", "h", "i"))) 486 expect_equal(ldf$c3, list(list(1.0, 2.0, 3.0), list(4.0, 5.0, 6.0), list (7.0, 8.0, 9.0))) 487 488 # MapType 489 schema <- structType(structField("name", "string"), 490 structField("info", "map<string,double>")) 491 df <- read.df(mapTypeJsonPath, "json", schema) 492 expect_equal(dtypes(df), list(c("name", "string"), 493 c("info", "map<string,double>"))) 494 ldf <- collect(df) 495 expect_equal(nrow(ldf), 3) 496 expect_equal(ncol(ldf), 2) 497 expect_equal(names(ldf), c("name", "info")) 498 expect_equal(ldf$name, c("Bob", "Alice", "David")) 499 bob <- ldf$info[[1]] 500 expect_equal(class(bob), "environment") 501 expect_equal(bob$age, 16) 502 expect_equal(bob$height, 176.5) 503 504 # StructType 505 df <- read.json(mapTypeJsonPath) 506 expect_equal(dtypes(df), list(c("info", "struct<age:bigint,height:double>"), 507 c("name", "string"))) 508 ldf <- collect(df) 509 expect_equal(nrow(ldf), 3) 510 expect_equal(ncol(ldf), 2) 511 expect_equal(names(ldf), c("info", "name")) 512 expect_equal(ldf$name, c("Bob", "Alice", "David")) 513 bob <- ldf$info[[1]] 514 expect_equal(class(bob), "struct") 515 expect_equal(bob$age, 16) 516 expect_equal(bob$height, 176.5) 517}) 518 519test_that("read/write json files", { 520 # Test read.df 521 df <- read.df(jsonPath, "json") 522 expect_is(df, "SparkDataFrame") 523 expect_equal(count(df), 3) 524 525 # Test read.df with a user defined schema 526 schema <- structType(structField("name", type = "string"), 527 structField("age", type = "double")) 528 529 df1 <- read.df(jsonPath, "json", schema) 530 expect_is(df1, "SparkDataFrame") 531 expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double"))) 532 533 # Test loadDF 534 df2 <- loadDF(jsonPath, "json", schema) 535 expect_is(df2, "SparkDataFrame") 536 expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double"))) 537 538 # Test read.json 539 df <- read.json(jsonPath) 540 expect_is(df, "SparkDataFrame") 541 expect_equal(count(df), 3) 542 543 # Test write.df 544 jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".json") 545 write.df(df, jsonPath2, "json", mode = "overwrite") 546 547 # Test write.json 548 jsonPath3 <- tempfile(pattern = "jsonPath3", fileext = ".json") 549 write.json(df, jsonPath3) 550 551 # Test read.json()/jsonFile() works with multiple input paths 552 jsonDF1 <- read.json(c(jsonPath2, jsonPath3)) 553 expect_is(jsonDF1, "SparkDataFrame") 554 expect_equal(count(jsonDF1), 6) 555 # Suppress warnings because jsonFile is deprecated 556 jsonDF2 <- suppressWarnings(jsonFile(c(jsonPath2, jsonPath3))) 557 expect_is(jsonDF2, "SparkDataFrame") 558 expect_equal(count(jsonDF2), 6) 559 560 unlink(jsonPath2) 561 unlink(jsonPath3) 562}) 563 564test_that("read/write json files - compression option", { 565 df <- read.df(jsonPath, "json") 566 567 jsonPath <- tempfile(pattern = "jsonPath", fileext = ".json") 568 write.json(df, jsonPath, compression = "gzip") 569 jsonDF <- read.json(jsonPath) 570 expect_is(jsonDF, "SparkDataFrame") 571 expect_equal(count(jsonDF), count(df)) 572 expect_true(length(list.files(jsonPath, pattern = ".gz")) > 0) 573 574 unlink(jsonPath) 575}) 576 577test_that("jsonRDD() on a RDD with json string", { 578 sqlContext <- suppressWarnings(sparkRSQL.init(sc)) 579 rdd <- parallelize(sc, mockLines) 580 expect_equal(countRDD(rdd), 3) 581 df <- suppressWarnings(jsonRDD(sqlContext, rdd)) 582 expect_is(df, "SparkDataFrame") 583 expect_equal(count(df), 3) 584 585 rdd2 <- flatMap(rdd, function(x) c(x, x)) 586 df <- suppressWarnings(jsonRDD(sqlContext, rdd2)) 587 expect_is(df, "SparkDataFrame") 588 expect_equal(count(df), 6) 589}) 590 591test_that("test tableNames and tables", { 592 df <- read.json(jsonPath) 593 createOrReplaceTempView(df, "table1") 594 expect_equal(length(tableNames()), 1) 595 tables <- tables() 596 expect_equal(count(tables), 1) 597 598 suppressWarnings(registerTempTable(df, "table2")) 599 tables <- tables() 600 expect_equal(count(tables), 2) 601 suppressWarnings(dropTempTable("table1")) 602 expect_true(dropTempView("table2")) 603 604 tables <- tables() 605 expect_equal(count(tables), 0) 606}) 607 608test_that( 609 "createOrReplaceTempView() results in a queryable table and sql() results in a new DataFrame", { 610 df <- read.json(jsonPath) 611 createOrReplaceTempView(df, "table1") 612 newdf <- sql("SELECT * FROM table1 where name = 'Michael'") 613 expect_is(newdf, "SparkDataFrame") 614 expect_equal(count(newdf), 1) 615 expect_true(dropTempView("table1")) 616 617 createOrReplaceTempView(df, "dfView") 618 sqlCast <- collect(sql("select cast('2' as decimal) as x from dfView limit 1")) 619 out <- capture.output(sqlCast) 620 expect_true(is.data.frame(sqlCast)) 621 expect_equal(names(sqlCast)[1], "x") 622 expect_equal(nrow(sqlCast), 1) 623 expect_equal(ncol(sqlCast), 1) 624 expect_equal(out[1], " x") 625 expect_equal(out[2], "1 2") 626 expect_true(dropTempView("dfView")) 627}) 628 629test_that("test cache, uncache and clearCache", { 630 df <- read.json(jsonPath) 631 createOrReplaceTempView(df, "table1") 632 cacheTable("table1") 633 uncacheTable("table1") 634 clearCache() 635 expect_true(dropTempView("table1")) 636}) 637 638test_that("insertInto() on a registered table", { 639 df <- read.df(jsonPath, "json") 640 write.df(df, parquetPath, "parquet", "overwrite") 641 dfParquet <- read.df(parquetPath, "parquet") 642 643 lines <- c("{\"name\":\"Bob\", \"age\":24}", 644 "{\"name\":\"James\", \"age\":35}") 645 jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".tmp") 646 parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") 647 writeLines(lines, jsonPath2) 648 df2 <- read.df(jsonPath2, "json") 649 write.df(df2, parquetPath2, "parquet", "overwrite") 650 dfParquet2 <- read.df(parquetPath2, "parquet") 651 652 createOrReplaceTempView(dfParquet, "table1") 653 insertInto(dfParquet2, "table1") 654 expect_equal(count(sql("select * from table1")), 5) 655 expect_equal(first(sql("select * from table1 order by age"))$name, "Michael") 656 expect_true(dropTempView("table1")) 657 658 createOrReplaceTempView(dfParquet, "table1") 659 insertInto(dfParquet2, "table1", overwrite = TRUE) 660 expect_equal(count(sql("select * from table1")), 2) 661 expect_equal(first(sql("select * from table1 order by age"))$name, "Bob") 662 expect_true(dropTempView("table1")) 663 664 unlink(jsonPath2) 665 unlink(parquetPath2) 666}) 667 668test_that("tableToDF() returns a new DataFrame", { 669 df <- read.json(jsonPath) 670 createOrReplaceTempView(df, "table1") 671 tabledf <- tableToDF("table1") 672 expect_is(tabledf, "SparkDataFrame") 673 expect_equal(count(tabledf), 3) 674 tabledf2 <- tableToDF("table1") 675 expect_equal(count(tabledf2), 3) 676 expect_true(dropTempView("table1")) 677}) 678 679test_that("toRDD() returns an RRDD", { 680 df <- read.json(jsonPath) 681 testRDD <- toRDD(df) 682 expect_is(testRDD, "RDD") 683 expect_equal(countRDD(testRDD), 3) 684}) 685 686test_that("union on two RDDs created from DataFrames returns an RRDD", { 687 df <- read.json(jsonPath) 688 RDD1 <- toRDD(df) 689 RDD2 <- toRDD(df) 690 unioned <- unionRDD(RDD1, RDD2) 691 expect_is(unioned, "RDD") 692 expect_equal(getSerializedMode(unioned), "byte") 693 expect_equal(collectRDD(unioned)[[2]]$name, "Andy") 694}) 695 696test_that("union on mixed serialization types correctly returns a byte RRDD", { 697 # Byte RDD 698 nums <- 1:10 699 rdd <- parallelize(sc, nums, 2L) 700 701 # String RDD 702 textLines <- c("Michael", 703 "Andy, 30", 704 "Justin, 19") 705 textPath <- tempfile(pattern = "sparkr-textLines", fileext = ".tmp") 706 writeLines(textLines, textPath) 707 textRDD <- textFile(sc, textPath) 708 709 df <- read.json(jsonPath) 710 dfRDD <- toRDD(df) 711 712 unionByte <- unionRDD(rdd, dfRDD) 713 expect_is(unionByte, "RDD") 714 expect_equal(getSerializedMode(unionByte), "byte") 715 expect_equal(collectRDD(unionByte)[[1]], 1) 716 expect_equal(collectRDD(unionByte)[[12]]$name, "Andy") 717 718 unionString <- unionRDD(textRDD, dfRDD) 719 expect_is(unionString, "RDD") 720 expect_equal(getSerializedMode(unionString), "byte") 721 expect_equal(collectRDD(unionString)[[1]], "Michael") 722 expect_equal(collectRDD(unionString)[[5]]$name, "Andy") 723}) 724 725test_that("objectFile() works with row serialization", { 726 objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp") 727 df <- read.json(jsonPath) 728 dfRDD <- toRDD(df) 729 saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath) 730 objectIn <- objectFile(sc, objectPath) 731 732 expect_is(objectIn, "RDD") 733 expect_equal(getSerializedMode(objectIn), "byte") 734 expect_equal(collectRDD(objectIn)[[2]]$age, 30) 735}) 736 737test_that("lapply() on a DataFrame returns an RDD with the correct columns", { 738 df <- read.json(jsonPath) 739 testRDD <- lapply(df, function(row) { 740 row$newCol <- row$age + 5 741 row 742 }) 743 expect_is(testRDD, "RDD") 744 collected <- collectRDD(testRDD) 745 expect_equal(collected[[1]]$name, "Michael") 746 expect_equal(collected[[2]]$newCol, 35) 747}) 748 749test_that("collect() returns a data.frame", { 750 df <- read.json(jsonPath) 751 rdf <- collect(df) 752 expect_true(is.data.frame(rdf)) 753 expect_equal(names(rdf)[1], "age") 754 expect_equal(nrow(rdf), 3) 755 expect_equal(ncol(rdf), 2) 756 757 # collect() returns data correctly from a DataFrame with 0 row 758 df0 <- limit(df, 0) 759 rdf <- collect(df0) 760 expect_true(is.data.frame(rdf)) 761 expect_equal(names(rdf)[1], "age") 762 expect_equal(nrow(rdf), 0) 763 expect_equal(ncol(rdf), 2) 764 765 # collect() correctly handles multiple columns with same name 766 df <- createDataFrame(list(list(1, 2)), schema = c("name", "name")) 767 ldf <- collect(df) 768 expect_equal(names(ldf), c("name", "name")) 769}) 770 771test_that("limit() returns DataFrame with the correct number of rows", { 772 df <- read.json(jsonPath) 773 dfLimited <- limit(df, 2) 774 expect_is(dfLimited, "SparkDataFrame") 775 expect_equal(count(dfLimited), 2) 776}) 777 778test_that("collect() and take() on a DataFrame return the same number of rows and columns", { 779 df <- read.json(jsonPath) 780 expect_equal(nrow(collect(df)), nrow(take(df, 10))) 781 expect_equal(ncol(collect(df)), ncol(take(df, 10))) 782}) 783 784test_that("collect() support Unicode characters", { 785 lines <- c("{\"name\":\"안녕하세요\"}", 786 "{\"name\":\"您好\", \"age\":30}", 787 "{\"name\":\"こんにちは\", \"age\":19}", 788 "{\"name\":\"Xin chào\"}") 789 790 jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 791 writeLines(lines, jsonPath) 792 793 df <- read.df(jsonPath, "json") 794 rdf <- collect(df) 795 expect_true(is.data.frame(rdf)) 796 expect_equal(rdf$name[1], markUtf8("안녕하세요")) 797 expect_equal(rdf$name[2], markUtf8("您好")) 798 expect_equal(rdf$name[3], markUtf8("こんにちは")) 799 expect_equal(rdf$name[4], markUtf8("Xin chào")) 800 801 df1 <- createDataFrame(rdf) 802 expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好")) 803}) 804 805test_that("multiple pipeline transformations result in an RDD with the correct values", { 806 df <- read.json(jsonPath) 807 first <- lapply(df, function(row) { 808 row$age <- row$age + 5 809 row 810 }) 811 second <- lapply(first, function(row) { 812 row$testCol <- if (row$age == 35 && !is.na(row$age)) TRUE else FALSE 813 row 814 }) 815 expect_is(second, "RDD") 816 expect_equal(countRDD(second), 3) 817 expect_equal(collectRDD(second)[[2]]$age, 35) 818 expect_true(collectRDD(second)[[2]]$testCol) 819 expect_false(collectRDD(second)[[3]]$testCol) 820}) 821 822test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame", { 823 df <- read.json(jsonPath) 824 expect_false(df@env$isCached) 825 cache(df) 826 expect_true(df@env$isCached) 827 828 unpersist(df) 829 expect_false(df@env$isCached) 830 831 persist(df, "MEMORY_AND_DISK") 832 expect_true(df@env$isCached) 833 834 expect_equal(storageLevel(df), 835 "MEMORY_AND_DISK - StorageLevel(disk, memory, deserialized, 1 replicas)") 836 837 unpersist(df) 838 expect_false(df@env$isCached) 839 840 # make sure the data is collectable 841 expect_true(is.data.frame(collect(df))) 842}) 843 844test_that("schema(), dtypes(), columns(), names() return the correct values/format", { 845 df <- read.json(jsonPath) 846 testSchema <- schema(df) 847 expect_equal(length(testSchema$fields()), 2) 848 expect_equal(testSchema$fields()[[1]]$dataType.toString(), "LongType") 849 expect_equal(testSchema$fields()[[2]]$dataType.simpleString(), "string") 850 expect_equal(testSchema$fields()[[1]]$name(), "age") 851 852 testTypes <- dtypes(df) 853 expect_equal(length(testTypes[[1]]), 2) 854 expect_equal(testTypes[[1]][1], "age") 855 856 testCols <- columns(df) 857 expect_equal(length(testCols), 2) 858 expect_equal(testCols[2], "name") 859 860 testNames <- names(df) 861 expect_equal(length(testNames), 2) 862 expect_equal(testNames[2], "name") 863}) 864 865test_that("names() colnames() set the column names", { 866 df <- read.json(jsonPath) 867 names(df) <- c("col1", "col2") 868 expect_equal(colnames(df)[2], "col2") 869 870 colnames(df) <- c("col3", "col4") 871 expect_equal(names(df)[1], "col3") 872 873 expect_error(colnames(df) <- c("sepal.length", "sepal_width"), 874 "Column names cannot contain the '.' symbol.") 875 expect_error(colnames(df) <- c(1, 2), "Invalid column names.") 876 expect_error(colnames(df) <- c("a"), 877 "Column names must have the same length as the number of columns in the dataset.") 878 expect_error(colnames(df) <- c("1", NA), "Column names cannot be NA.") 879 880 # Note: if this test is broken, remove check for "." character on colnames<- method 881 irisDF <- suppressWarnings(createDataFrame(iris)) 882 expect_equal(names(irisDF)[1], "Sepal_Length") 883 884 # Test base::colnames base::names 885 m2 <- cbind(1, 1:4) 886 expect_equal(colnames(m2, do.NULL = FALSE), c("col1", "col2")) 887 colnames(m2) <- c("x", "Y") 888 expect_equal(colnames(m2), c("x", "Y")) 889 890 z <- list(a = 1, b = "c", c = 1:3) 891 expect_equal(names(z)[3], "c") 892 names(z)[3] <- "c2" 893 expect_equal(names(z)[3], "c2") 894}) 895 896test_that("head() and first() return the correct data", { 897 df <- read.json(jsonPath) 898 testHead <- head(df) 899 expect_equal(nrow(testHead), 3) 900 expect_equal(ncol(testHead), 2) 901 902 testHead2 <- head(df, 2) 903 expect_equal(nrow(testHead2), 2) 904 expect_equal(ncol(testHead2), 2) 905 906 testFirst <- first(df) 907 expect_equal(nrow(testFirst), 1) 908 909 # head() and first() return the correct data on 910 # a DataFrame with 0 row 911 df0 <- limit(df, 0) 912 913 testHead <- head(df0) 914 expect_equal(nrow(testHead), 0) 915 expect_equal(ncol(testHead), 2) 916 917 testFirst <- first(df0) 918 expect_equal(nrow(testFirst), 0) 919 expect_equal(ncol(testFirst), 2) 920}) 921 922test_that("distinct(), unique() and dropDuplicates() on DataFrames", { 923 lines <- c("{\"name\":\"Michael\"}", 924 "{\"name\":\"Andy\", \"age\":30}", 925 "{\"name\":\"Justin\", \"age\":19}", 926 "{\"name\":\"Justin\", \"age\":19}") 927 jsonPathWithDup <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 928 writeLines(lines, jsonPathWithDup) 929 930 df <- read.json(jsonPathWithDup) 931 uniques <- distinct(df) 932 expect_is(uniques, "SparkDataFrame") 933 expect_equal(count(uniques), 3) 934 935 uniques2 <- unique(df) 936 expect_is(uniques2, "SparkDataFrame") 937 expect_equal(count(uniques2), 3) 938 939 # Test dropDuplicates() 940 df <- createDataFrame( 941 list( 942 list(2, 1, 2), list(1, 1, 1), 943 list(1, 2, 1), list(2, 1, 2), 944 list(2, 2, 2), list(2, 2, 1), 945 list(2, 1, 1), list(1, 1, 2), 946 list(1, 2, 2), list(1, 2, 1)), 947 schema = c("key", "value1", "value2")) 948 result <- collect(dropDuplicates(df)) 949 expected <- rbind.data.frame( 950 c(1, 1, 1), c(1, 1, 2), c(1, 2, 1), 951 c(1, 2, 2), c(2, 1, 1), c(2, 1, 2), 952 c(2, 2, 1), c(2, 2, 2)) 953 names(expected) <- c("key", "value1", "value2") 954 expect_equivalent( 955 result[order(result$key, result$value1, result$value2), ], 956 expected) 957 958 result <- collect(dropDuplicates(df, c("key", "value1"))) 959 expected <- rbind.data.frame( 960 c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2)) 961 names(expected) <- c("key", "value1", "value2") 962 expect_equivalent( 963 result[order(result$key, result$value1, result$value2), ], 964 expected) 965 966 result <- collect(dropDuplicates(df, "key", "value1")) 967 expected <- rbind.data.frame( 968 c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2)) 969 names(expected) <- c("key", "value1", "value2") 970 expect_equivalent( 971 result[order(result$key, result$value1, result$value2), ], 972 expected) 973 974 result <- collect(dropDuplicates(df, "key")) 975 expected <- rbind.data.frame( 976 c(1, 1, 1), c(2, 1, 2)) 977 names(expected) <- c("key", "value1", "value2") 978 expect_equivalent( 979 result[order(result$key, result$value1, result$value2), ], 980 expected) 981}) 982 983test_that("sample on a DataFrame", { 984 df <- read.json(jsonPath) 985 sampled <- sample(df, FALSE, 1.0) 986 expect_equal(nrow(collect(sampled)), count(df)) 987 expect_is(sampled, "SparkDataFrame") 988 sampled2 <- sample(df, FALSE, 0.1, 0) # set seed for predictable result 989 expect_true(count(sampled2) < 3) 990 991 count1 <- count(sample(df, FALSE, 0.1, 0)) 992 count2 <- count(sample(df, FALSE, 0.1, 0)) 993 expect_equal(count1, count2) 994 995 # Also test sample_frac 996 sampled3 <- sample_frac(df, FALSE, 0.1, 0) # set seed for predictable result 997 expect_true(count(sampled3) < 3) 998 999 # nolint start 1000 # Test base::sample is working 1001 #expect_equal(length(sample(1:12)), 12) 1002 # nolint end 1003}) 1004 1005test_that("select operators", { 1006 df <- select(read.json(jsonPath), "name", "age") 1007 expect_is(df$name, "Column") 1008 expect_is(df[[2]], "Column") 1009 expect_is(df[["age"]], "Column") 1010 1011 expect_warning(df[[1:2]], 1012 "Subset index has length > 1. Only the first index is used.") 1013 expect_is(suppressWarnings(df[[1:2]]), "Column") 1014 expect_warning(df[[c("name", "age")]], 1015 "Subset index has length > 1. Only the first index is used.") 1016 expect_is(suppressWarnings(df[[c("name", "age")]]), "Column") 1017 1018 expect_warning(df[[1:2]] <- df[[1]], 1019 "Subset index has length > 1. Only the first index is used.") 1020 expect_warning(df[[c("name", "age")]] <- df[[1]], 1021 "Subset index has length > 1. Only the first index is used.") 1022 1023 expect_is(df[, 1, drop = F], "SparkDataFrame") 1024 expect_equal(columns(df[, 1, drop = F]), c("name")) 1025 expect_equal(columns(df[, "age", drop = F]), c("age")) 1026 1027 df2 <- df[, c("age", "name")] 1028 expect_is(df2, "SparkDataFrame") 1029 expect_equal(columns(df2), c("age", "name")) 1030 1031 df$age2 <- df$age 1032 expect_equal(columns(df), c("name", "age", "age2")) 1033 expect_equal(count(where(df, df$age2 == df$age)), 2) 1034 df$age2 <- df$age * 2 1035 expect_equal(columns(df), c("name", "age", "age2")) 1036 expect_equal(count(where(df, df$age2 == df$age * 2)), 2) 1037 df$age2 <- df[["age"]] * 3 1038 expect_equal(columns(df), c("name", "age", "age2")) 1039 expect_equal(count(where(df, df$age2 == df$age * 3)), 2) 1040 1041 df$age2 <- 21 1042 expect_equal(columns(df), c("name", "age", "age2")) 1043 expect_equal(count(where(df, df$age2 == 21)), 3) 1044 1045 df$age2 <- c(22) 1046 expect_equal(columns(df), c("name", "age", "age2")) 1047 expect_equal(count(where(df, df$age2 == 22)), 3) 1048 1049 expect_error(df$age3 <- c(22, NA), 1050 "value must be a Column, literal value as atomic in length of 1, or NULL") 1051 1052 df[["age2"]] <- 23 1053 expect_equal(columns(df), c("name", "age", "age2")) 1054 expect_equal(count(where(df, df$age2 == 23)), 3) 1055 1056 df[[3]] <- 24 1057 expect_equal(columns(df), c("name", "age", "age2")) 1058 expect_equal(count(where(df, df$age2 == 24)), 3) 1059 1060 df[[3]] <- df$age 1061 expect_equal(count(where(df, df$age2 == df$age)), 2) 1062 1063 df[["age2"]] <- df[["name"]] 1064 expect_equal(count(where(df, df$age2 == df$name)), 3) 1065 1066 expect_error(df[["age3"]] <- c(22, 23), 1067 "value must be a Column, literal value as atomic in length of 1, or NULL") 1068 1069 # Test parameter drop 1070 expect_equal(class(df[, 1]) == "SparkDataFrame", T) 1071 expect_equal(class(df[, 1, drop = T]) == "Column", T) 1072 expect_equal(class(df[, 1, drop = F]) == "SparkDataFrame", T) 1073 expect_equal(class(df[df$age > 4, 2, drop = T]) == "Column", T) 1074 expect_equal(class(df[df$age > 4, 2, drop = F]) == "SparkDataFrame", T) 1075}) 1076 1077test_that("select with column", { 1078 df <- read.json(jsonPath) 1079 df1 <- select(df, "name") 1080 expect_equal(columns(df1), c("name")) 1081 expect_equal(count(df1), 3) 1082 1083 df2 <- select(df, df$age) 1084 expect_equal(columns(df2), c("age")) 1085 expect_equal(count(df2), 3) 1086 1087 df3 <- select(df, lit("x")) 1088 expect_equal(columns(df3), c("x")) 1089 expect_equal(count(df3), 3) 1090 expect_equal(collect(select(df3, "x"))[[1, 1]], "x") 1091 1092 df4 <- select(df, c("name", "age")) 1093 expect_equal(columns(df4), c("name", "age")) 1094 expect_equal(count(df4), 3) 1095 1096 expect_error(select(df, c("name", "age"), "name"), 1097 "To select multiple columns, use a character vector or list for col") 1098}) 1099 1100test_that("drop column", { 1101 df <- select(read.json(jsonPath), "name", "age") 1102 df1 <- drop(df, "name") 1103 expect_equal(columns(df1), c("age")) 1104 1105 df$age2 <- df$age 1106 df1 <- drop(df, c("name", "age")) 1107 expect_equal(columns(df1), c("age2")) 1108 1109 df1 <- drop(df, df$age) 1110 expect_equal(columns(df1), c("name", "age2")) 1111 1112 df$age2 <- NULL 1113 expect_equal(columns(df), c("name", "age")) 1114 df$age3 <- NULL 1115 expect_equal(columns(df), c("name", "age")) 1116 1117 # Test to make sure base::drop is not masked 1118 expect_equal(drop(1:3 %*% 2:4), 20) 1119}) 1120 1121test_that("subsetting", { 1122 # read.json returns columns in random order 1123 df <- select(read.json(jsonPath), "name", "age") 1124 filtered <- df[df$age > 20, ] 1125 expect_equal(count(filtered), 1) 1126 expect_equal(columns(filtered), c("name", "age")) 1127 expect_equal(collect(filtered)$name, "Andy") 1128 1129 df2 <- df[df$age == 19, 1, drop = F] 1130 expect_is(df2, "SparkDataFrame") 1131 expect_equal(count(df2), 1) 1132 expect_equal(columns(df2), c("name")) 1133 expect_equal(collect(df2)$name, "Justin") 1134 1135 df3 <- df[df$age > 20, 2, drop = F] 1136 expect_equal(count(df3), 1) 1137 expect_equal(columns(df3), c("age")) 1138 1139 df4 <- df[df$age %in% c(19, 30), 1:2] 1140 expect_equal(count(df4), 2) 1141 expect_equal(columns(df4), c("name", "age")) 1142 1143 df5 <- df[df$age %in% c(19), c(1, 2)] 1144 expect_equal(count(df5), 1) 1145 expect_equal(columns(df5), c("name", "age")) 1146 1147 df6 <- subset(df, df$age %in% c(30), c(1, 2)) 1148 expect_equal(count(df6), 1) 1149 expect_equal(columns(df6), c("name", "age")) 1150 1151 df7 <- subset(df, select = "name", drop = F) 1152 expect_equal(count(df7), 3) 1153 expect_equal(columns(df7), c("name")) 1154 1155 # Test base::subset is working 1156 expect_equal(nrow(subset(airquality, Temp > 80, select = c(Ozone, Temp))), 68) 1157}) 1158 1159test_that("selectExpr() on a DataFrame", { 1160 df <- read.json(jsonPath) 1161 selected <- selectExpr(df, "age * 2") 1162 expect_equal(names(selected), "(age * 2)") 1163 expect_equal(collect(selected), collect(select(df, df$age * 2L))) 1164 1165 selected2 <- selectExpr(df, "name as newName", "abs(age) as age") 1166 expect_equal(names(selected2), c("newName", "age")) 1167 expect_equal(count(selected2), 3) 1168}) 1169 1170test_that("expr() on a DataFrame", { 1171 df <- read.json(jsonPath) 1172 expect_equal(collect(select(df, expr("abs(-123)")))[1, 1], 123) 1173}) 1174 1175test_that("column calculation", { 1176 df <- read.json(jsonPath) 1177 d <- collect(select(df, alias(df$age + 1, "age2"))) 1178 expect_equal(names(d), c("age2")) 1179 df2 <- select(df, lower(df$name), abs(df$age)) 1180 expect_is(df2, "SparkDataFrame") 1181 expect_equal(count(df2), 3) 1182}) 1183 1184test_that("test HiveContext", { 1185 setHiveContext(sc) 1186 df <- createExternalTable("json", jsonPath, "json") 1187 expect_is(df, "SparkDataFrame") 1188 expect_equal(count(df), 3) 1189 df2 <- sql("select * from json") 1190 expect_is(df2, "SparkDataFrame") 1191 expect_equal(count(df2), 3) 1192 1193 jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 1194 invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2)) 1195 df3 <- sql("select * from json2") 1196 expect_is(df3, "SparkDataFrame") 1197 expect_equal(count(df3), 3) 1198 unlink(jsonPath2) 1199 1200 hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 1201 invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath)) 1202 df4 <- sql("select * from hivetestbl") 1203 expect_is(df4, "SparkDataFrame") 1204 expect_equal(count(df4), 3) 1205 unlink(hivetestDataPath) 1206 1207 parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 1208 invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath)) 1209 df5 <- sql("select * from parquetest") 1210 expect_is(df5, "SparkDataFrame") 1211 expect_equal(count(df5), 3) 1212 unlink(parquetDataPath) 1213 unsetHiveContext() 1214}) 1215 1216test_that("column operators", { 1217 c <- column("a") 1218 c2 <- (- c + 1 - 2) * 3 / 4.0 1219 c3 <- (c + c2 - c2) * c2 %% c2 1220 c4 <- (c > c2) & (c2 <= c3) | (c == c2) & (c2 != c3) 1221 c5 <- c2 ^ c3 ^ c4 1222}) 1223 1224test_that("column functions", { 1225 c <- column("a") 1226 c1 <- abs(c) + acos(c) + approxCountDistinct(c) + ascii(c) + asin(c) + atan(c) 1227 c2 <- avg(c) + base64(c) + bin(c) + bitwiseNOT(c) + cbrt(c) + ceil(c) + cos(c) 1228 c3 <- cosh(c) + count(c) + crc32(c) + hash(c) + exp(c) 1229 c4 <- explode(c) + expm1(c) + factorial(c) + first(c) + floor(c) + hex(c) 1230 c5 <- hour(c) + initcap(c) + last(c) + last_day(c) + length(c) 1231 c6 <- log(c) + (c) + log1p(c) + log2(c) + lower(c) + ltrim(c) + max(c) + md5(c) 1232 c7 <- mean(c) + min(c) + month(c) + negate(c) + posexplode(c) + quarter(c) 1233 c8 <- reverse(c) + rint(c) + round(c) + rtrim(c) + sha1(c) + monotonically_increasing_id() 1234 c9 <- signum(c) + sin(c) + sinh(c) + size(c) + stddev(c) + soundex(c) + sqrt(c) + sum(c) 1235 c10 <- sumDistinct(c) + tan(c) + tanh(c) + toDegrees(c) + toRadians(c) 1236 c11 <- to_date(c) + trim(c) + unbase64(c) + unhex(c) + upper(c) 1237 c12 <- variance(c) 1238 c13 <- lead("col", 1) + lead(c, 1) + lag("col", 1) + lag(c, 1) 1239 c14 <- cume_dist() + ntile(1) + corr(c, c1) 1240 c15 <- dense_rank() + percent_rank() + rank() + row_number() 1241 c16 <- is.nan(c) + isnan(c) + isNaN(c) 1242 c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1") 1243 c18 <- covar_pop(c, c1) + covar_pop("c", "c1") 1244 c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3) 1245 1246 # Test if base::is.nan() is exposed 1247 expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE)) 1248 1249 # Test if base::rank() is exposed 1250 expect_equal(class(rank())[[1]], "Column") 1251 expect_equal(rank(1:3), as.numeric(c(1:3))) 1252 1253 df <- read.json(jsonPath) 1254 df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20))) 1255 expect_equal(collect(df2)[[2, 1]], TRUE) 1256 expect_equal(collect(df2)[[2, 2]], FALSE) 1257 expect_equal(collect(df2)[[3, 1]], FALSE) 1258 expect_equal(collect(df2)[[3, 2]], TRUE) 1259 1260 df3 <- select(df, between(df$name, c("Apache", "Spark"))) 1261 expect_equal(collect(df3)[[1, 1]], TRUE) 1262 expect_equal(collect(df3)[[2, 1]], FALSE) 1263 expect_equal(collect(df3)[[3, 1]], TRUE) 1264 1265 df4 <- select(df, countDistinct(df$age, df$name)) 1266 expect_equal(collect(df4)[[1, 1]], 2) 1267 1268 expect_equal(collect(select(df, sum(df$age)))[1, 1], 49) 1269 expect_true(abs(collect(select(df, stddev(df$age)))[1, 1] - 7.778175) < 1e-6) 1270 expect_equal(collect(select(df, var_pop(df$age)))[1, 1], 30.25) 1271 1272 df5 <- createDataFrame(list(list(a = "010101"))) 1273 expect_equal(collect(select(df5, conv(df5$a, 2, 16)))[1, 1], "15") 1274 1275 # Test array_contains() and sort_array() 1276 df <- createDataFrame(list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L)))) 1277 result <- collect(select(df, array_contains(df[[1]], 1L)))[[1]] 1278 expect_equal(result, c(TRUE, FALSE)) 1279 1280 result <- collect(select(df, sort_array(df[[1]], FALSE)))[[1]] 1281 expect_equal(result, list(list(3L, 2L, 1L), list(6L, 5L, 4L))) 1282 result <- collect(select(df, sort_array(df[[1]])))[[1]] 1283 expect_equal(result, list(list(1L, 2L, 3L), list(4L, 5L, 6L))) 1284 1285 # Test that stats::lag is working 1286 expect_equal(length(lag(ldeaths, 12)), 72) 1287 1288 # Test struct() 1289 df <- createDataFrame(list(list(1L, 2L, 3L), list(4L, 5L, 6L)), 1290 schema = c("a", "b", "c")) 1291 result <- collect(select(df, alias(struct("a", "c"), "d"))) 1292 expected <- data.frame(row.names = 1:2) 1293 expected$"d" <- list(listToStruct(list(a = 1L, c = 3L)), 1294 listToStruct(list(a = 4L, c = 6L))) 1295 expect_equal(result, expected) 1296 1297 result <- collect(select(df, alias(struct(df$a, df$b), "d"))) 1298 expected <- data.frame(row.names = 1:2) 1299 expected$"d" <- list(listToStruct(list(a = 1L, b = 2L)), 1300 listToStruct(list(a = 4L, b = 5L))) 1301 expect_equal(result, expected) 1302 1303 # Test encode(), decode() 1304 bytes <- as.raw(c(0xe5, 0xa4, 0xa7, 0xe5, 0x8d, 0x83, 0xe4, 0xb8, 0x96, 0xe7, 0x95, 0x8c)) 1305 df <- createDataFrame(list(list(markUtf8("大千世界"), "utf-8", bytes)), 1306 schema = c("a", "b", "c")) 1307 result <- collect(select(df, encode(df$a, "utf-8"), decode(df$c, "utf-8"))) 1308 expect_equal(result[[1]][[1]], bytes) 1309 expect_equal(result[[2]], markUtf8("大千世界")) 1310 1311 # Test first(), last() 1312 df <- read.json(jsonPath) 1313 expect_equal(collect(select(df, first(df$age)))[[1]], NA_real_) 1314 expect_equal(collect(select(df, first(df$age, TRUE)))[[1]], 30) 1315 expect_equal(collect(select(df, first("age")))[[1]], NA_real_) 1316 expect_equal(collect(select(df, first("age", TRUE)))[[1]], 30) 1317 expect_equal(collect(select(df, last(df$age)))[[1]], 19) 1318 expect_equal(collect(select(df, last(df$age, TRUE)))[[1]], 19) 1319 expect_equal(collect(select(df, last("age")))[[1]], 19) 1320 expect_equal(collect(select(df, last("age", TRUE)))[[1]], 19) 1321 1322 # Test bround() 1323 df <- createDataFrame(data.frame(x = c(2.5, 3.5))) 1324 expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2) 1325 expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4) 1326}) 1327 1328test_that("column binary mathfunctions", { 1329 lines <- c("{\"a\":1, \"b\":5}", 1330 "{\"a\":2, \"b\":6}", 1331 "{\"a\":3, \"b\":7}", 1332 "{\"a\":4, \"b\":8}") 1333 jsonPathWithDup <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 1334 writeLines(lines, jsonPathWithDup) 1335 df <- read.json(jsonPathWithDup) 1336 expect_equal(collect(select(df, atan2(df$a, df$b)))[1, "ATAN2(a, b)"], atan2(1, 5)) 1337 expect_equal(collect(select(df, atan2(df$a, df$b)))[2, "ATAN2(a, b)"], atan2(2, 6)) 1338 expect_equal(collect(select(df, atan2(df$a, df$b)))[3, "ATAN2(a, b)"], atan2(3, 7)) 1339 expect_equal(collect(select(df, atan2(df$a, df$b)))[4, "ATAN2(a, b)"], atan2(4, 8)) 1340 ## nolint start 1341 expect_equal(collect(select(df, hypot(df$a, df$b)))[1, "HYPOT(a, b)"], sqrt(1^2 + 5^2)) 1342 expect_equal(collect(select(df, hypot(df$a, df$b)))[2, "HYPOT(a, b)"], sqrt(2^2 + 6^2)) 1343 expect_equal(collect(select(df, hypot(df$a, df$b)))[3, "HYPOT(a, b)"], sqrt(3^2 + 7^2)) 1344 expect_equal(collect(select(df, hypot(df$a, df$b)))[4, "HYPOT(a, b)"], sqrt(4^2 + 8^2)) 1345 ## nolint end 1346 expect_equal(collect(select(df, shiftLeft(df$b, 1)))[4, 1], 16) 1347 expect_equal(collect(select(df, shiftRight(df$b, 1)))[4, 1], 4) 1348 expect_equal(collect(select(df, shiftRightUnsigned(df$b, 1)))[4, 1], 4) 1349 expect_equal(class(collect(select(df, rand()))[2, 1]), "numeric") 1350 expect_equal(collect(select(df, rand(1)))[1, 1], 0.134, tolerance = 0.01) 1351 expect_equal(class(collect(select(df, randn()))[2, 1]), "numeric") 1352 expect_equal(collect(select(df, randn(1)))[1, 1], -1.03, tolerance = 0.01) 1353}) 1354 1355test_that("string operators", { 1356 df <- read.json(jsonPath) 1357 expect_equal(count(where(df, like(df$name, "A%"))), 1) 1358 expect_equal(count(where(df, startsWith(df$name, "A"))), 1) 1359 expect_true(first(select(df, startsWith(df$name, "M")))[[1]]) 1360 expect_false(first(select(df, startsWith(df$name, "m")))[[1]]) 1361 expect_true(first(select(df, endsWith(df$name, "el")))[[1]]) 1362 expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi") 1363 if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) { 1364 expect_true(startsWith("Hello World", "Hello")) 1365 expect_false(endsWith("Hello World", "a")) 1366 } 1367 expect_equal(collect(select(df, cast(df$age, "string")))[[2, 1]], "30") 1368 expect_equal(collect(select(df, concat(df$name, lit(":"), df$age)))[[2, 1]], "Andy:30") 1369 expect_equal(collect(select(df, concat_ws(":", df$name)))[[2, 1]], "Andy") 1370 expect_equal(collect(select(df, concat_ws(":", df$name, df$age)))[[2, 1]], "Andy:30") 1371 expect_equal(collect(select(df, instr(df$name, "i")))[, 1], c(2, 0, 5)) 1372 expect_equal(collect(select(df, format_number(df$age, 2)))[2, 1], "30.00") 1373 expect_equal(collect(select(df, sha1(df$name)))[2, 1], 1374 "ab5a000e88b5d9d0fa2575f5c6263eb93452405d") 1375 expect_equal(collect(select(df, sha2(df$name, 256)))[2, 1], 1376 "80f2aed3c618c423ddf05a2891229fba44942d907173152442cf6591441ed6dc") 1377 expect_equal(collect(select(df, format_string("Name:%s", df$name)))[2, 1], "Name:Andy") 1378 expect_equal(collect(select(df, format_string("%s, %d", df$name, df$age)))[2, 1], "Andy, 30") 1379 expect_equal(collect(select(df, regexp_extract(df$name, "(n.y)", 1)))[2, 1], "ndy") 1380 expect_equal(collect(select(df, regexp_replace(df$name, "(n.y)", "ydn")))[2, 1], "Aydn") 1381 1382 l2 <- list(list(a = "aaads")) 1383 df2 <- createDataFrame(l2) 1384 expect_equal(collect(select(df2, locate("aa", df2$a)))[1, 1], 1) 1385 expect_equal(collect(select(df2, locate("aa", df2$a, 2)))[1, 1], 2) 1386 expect_equal(collect(select(df2, lpad(df2$a, 8, "#")))[1, 1], "###aaads") # nolint 1387 expect_equal(collect(select(df2, rpad(df2$a, 8, "#")))[1, 1], "aaads###") # nolint 1388 1389 l3 <- list(list(a = "a.b.c.d")) 1390 df3 <- createDataFrame(l3) 1391 expect_equal(collect(select(df3, substring_index(df3$a, ".", 2)))[1, 1], "a.b") 1392 expect_equal(collect(select(df3, substring_index(df3$a, ".", -3)))[1, 1], "b.c.d") 1393 expect_equal(collect(select(df3, translate(df3$a, "bc", "12")))[1, 1], "a.1.2.d") 1394}) 1395 1396test_that("date functions on a DataFrame", { 1397 .originalTimeZone <- Sys.getenv("TZ") 1398 Sys.setenv(TZ = "UTC") 1399 l <- list(list(a = 1L, b = as.Date("2012-12-13")), 1400 list(a = 2L, b = as.Date("2013-12-14")), 1401 list(a = 3L, b = as.Date("2014-12-15"))) 1402 df <- createDataFrame(l) 1403 expect_equal(collect(select(df, dayofmonth(df$b)))[, 1], c(13, 14, 15)) 1404 expect_equal(collect(select(df, dayofyear(df$b)))[, 1], c(348, 348, 349)) 1405 expect_equal(collect(select(df, weekofyear(df$b)))[, 1], c(50, 50, 51)) 1406 expect_equal(collect(select(df, year(df$b)))[, 1], c(2012, 2013, 2014)) 1407 expect_equal(collect(select(df, month(df$b)))[, 1], c(12, 12, 12)) 1408 expect_equal(collect(select(df, last_day(df$b)))[, 1], 1409 c(as.Date("2012-12-31"), as.Date("2013-12-31"), as.Date("2014-12-31"))) 1410 expect_equal(collect(select(df, next_day(df$b, "MONDAY")))[, 1], 1411 c(as.Date("2012-12-17"), as.Date("2013-12-16"), as.Date("2014-12-22"))) 1412 expect_equal(collect(select(df, date_format(df$b, "y")))[, 1], c("2012", "2013", "2014")) 1413 expect_equal(collect(select(df, add_months(df$b, 3)))[, 1], 1414 c(as.Date("2013-03-13"), as.Date("2014-03-14"), as.Date("2015-03-15"))) 1415 expect_equal(collect(select(df, date_add(df$b, 1)))[, 1], 1416 c(as.Date("2012-12-14"), as.Date("2013-12-15"), as.Date("2014-12-16"))) 1417 expect_equal(collect(select(df, date_sub(df$b, 1)))[, 1], 1418 c(as.Date("2012-12-12"), as.Date("2013-12-13"), as.Date("2014-12-14"))) 1419 1420 l2 <- list(list(a = 1L, b = as.POSIXlt("2012-12-13 12:34:00", tz = "UTC")), 1421 list(a = 2L, b = as.POSIXlt("2014-12-15 01:24:34", tz = "UTC"))) 1422 df2 <- createDataFrame(l2) 1423 expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24)) 1424 expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34)) 1425 expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1], 1426 c(as.POSIXlt("2012-12-13 21:34:00 UTC"), as.POSIXlt("2014-12-15 10:24:34 UTC"))) 1427 expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1], 1428 c(as.POSIXlt("2012-12-13 03:34:00 UTC"), as.POSIXlt("2014-12-14 16:24:34 UTC"))) 1429 expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0) 1430 expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0) 1431 expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0) 1432 1433 l3 <- list(list(a = 1000), list(a = -1000)) 1434 df3 <- createDataFrame(l3) 1435 result31 <- collect(select(df3, from_unixtime(df3$a))) 1436 expect_equal(grep("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", result31[, 1], perl = TRUE), 1437 c(1, 2)) 1438 result32 <- collect(select(df3, from_unixtime(df3$a, "yyyy"))) 1439 expect_equal(grep("\\d{4}", result32[, 1]), c(1, 2)) 1440 Sys.setenv(TZ = .originalTimeZone) 1441}) 1442 1443test_that("greatest() and least() on a DataFrame", { 1444 l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) 1445 df <- createDataFrame(l) 1446 expect_equal(collect(select(df, greatest(df$a, df$b)))[, 1], c(2, 4)) 1447 expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3)) 1448}) 1449 1450test_that("time windowing (window()) with all inputs", { 1451 df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) 1452 df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds") 1453 local <- collect(df)$v 1454 # Not checking time windows because of possible time zone issues. Just checking that the function 1455 # works 1456 expect_equal(local, c(1)) 1457}) 1458 1459test_that("time windowing (window()) with slide duration", { 1460 df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) 1461 df$window <- window(df$t, "5 seconds", "2 seconds") 1462 local <- collect(df)$v 1463 # Not checking time windows because of possible time zone issues. Just checking that the function 1464 # works 1465 expect_equal(local, c(1, 1)) 1466}) 1467 1468test_that("time windowing (window()) with start time", { 1469 df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) 1470 df$window <- window(df$t, "5 seconds", startTime = "2 seconds") 1471 local <- collect(df)$v 1472 # Not checking time windows because of possible time zone issues. Just checking that the function 1473 # works 1474 expect_equal(local, c(1)) 1475}) 1476 1477test_that("time windowing (window()) with just window duration", { 1478 df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) 1479 df$window <- window(df$t, "5 seconds") 1480 local <- collect(df)$v 1481 # Not checking time windows because of possible time zone issues. Just checking that the function 1482 # works 1483 expect_equal(local, c(1)) 1484}) 1485 1486test_that("when(), otherwise() and ifelse() on a DataFrame", { 1487 l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) 1488 df <- createDataFrame(l) 1489 expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, 1)))[, 1], c(NA, 1)) 1490 expect_equal(collect(select(df, otherwise(when(df$a > 1, 1), 0)))[, 1], c(0, 1)) 1491 expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, 0, 1)))[, 1], c(1, 0)) 1492}) 1493 1494test_that("when(), otherwise() and ifelse() with column on a DataFrame", { 1495 l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) 1496 df <- createDataFrame(l) 1497 expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, lit(1))))[, 1], c(NA, 1)) 1498 expect_equal(collect(select(df, otherwise(when(df$a > 1, lit(1)), lit(0))))[, 1], c(0, 1)) 1499 expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, lit(0), lit(1))))[, 1], c(1, 0)) 1500}) 1501 1502test_that("group by, agg functions", { 1503 df <- read.json(jsonPath) 1504 df1 <- agg(df, name = "max", age = "sum") 1505 expect_equal(1, count(df1)) 1506 df1 <- agg(df, age2 = max(df$age)) 1507 expect_equal(1, count(df1)) 1508 expect_equal(columns(df1), c("age2")) 1509 1510 gd <- groupBy(df, "name") 1511 expect_is(gd, "GroupedData") 1512 df2 <- count(gd) 1513 expect_is(df2, "SparkDataFrame") 1514 expect_equal(3, count(df2)) 1515 1516 # Also test group_by, summarize, mean 1517 gd1 <- group_by(df, "name") 1518 expect_is(gd1, "GroupedData") 1519 df_summarized <- summarize(gd, mean_age = mean(df$age)) 1520 expect_is(df_summarized, "SparkDataFrame") 1521 expect_equal(3, count(df_summarized)) 1522 1523 df3 <- agg(gd, age = "stddev") 1524 expect_is(df3, "SparkDataFrame") 1525 df3_local <- collect(df3) 1526 expect_true(is.nan(df3_local[df3_local$name == "Andy", ][1, 2])) 1527 1528 df4 <- agg(gd, sumAge = sum(df$age)) 1529 expect_is(df4, "SparkDataFrame") 1530 expect_equal(3, count(df4)) 1531 expect_equal(columns(df4), c("name", "sumAge")) 1532 1533 df5 <- sum(gd, "age") 1534 expect_is(df5, "SparkDataFrame") 1535 expect_equal(3, count(df5)) 1536 1537 expect_equal(3, count(mean(gd))) 1538 expect_equal(3, count(max(gd))) 1539 expect_equal(30, collect(max(gd))[2, 2]) 1540 expect_equal(1, collect(count(gd))[1, 2]) 1541 1542 mockLines2 <- c("{\"name\":\"ID1\", \"value\": \"10\"}", 1543 "{\"name\":\"ID1\", \"value\": \"10\"}", 1544 "{\"name\":\"ID1\", \"value\": \"22\"}", 1545 "{\"name\":\"ID2\", \"value\": \"-3\"}") 1546 jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 1547 writeLines(mockLines2, jsonPath2) 1548 gd2 <- groupBy(read.json(jsonPath2), "name") 1549 df6 <- agg(gd2, value = "sum") 1550 df6_local <- collect(df6) 1551 expect_equal(42, df6_local[df6_local$name == "ID1", ][1, 2]) 1552 expect_equal(-3, df6_local[df6_local$name == "ID2", ][1, 2]) 1553 1554 df7 <- agg(gd2, value = "stddev") 1555 df7_local <- collect(df7) 1556 expect_true(abs(df7_local[df7_local$name == "ID1", ][1, 2] - 6.928203) < 1e-6) 1557 expect_true(is.nan(df7_local[df7_local$name == "ID2", ][1, 2])) 1558 1559 mockLines3 <- c("{\"name\":\"Andy\", \"age\":30}", 1560 "{\"name\":\"Andy\", \"age\":30}", 1561 "{\"name\":\"Justin\", \"age\":19}", 1562 "{\"name\":\"Justin\", \"age\":1}") 1563 jsonPath3 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 1564 writeLines(mockLines3, jsonPath3) 1565 df8 <- read.json(jsonPath3) 1566 gd3 <- groupBy(df8, "name") 1567 gd3_local <- collect(sum(gd3)) 1568 expect_equal(60, gd3_local[gd3_local$name == "Andy", ][1, 2]) 1569 expect_equal(20, gd3_local[gd3_local$name == "Justin", ][1, 2]) 1570 1571 expect_true(abs(collect(agg(df, sd(df$age)))[1, 1] - 7.778175) < 1e-6) 1572 gd3_local <- collect(agg(gd3, var(df8$age))) 1573 expect_equal(162, gd3_local[gd3_local$name == "Justin", ][1, 2]) 1574 1575 # Test stats::sd, stats::var are working 1576 expect_true(abs(sd(1:2) - 0.7071068) < 1e-6) 1577 expect_true(abs(var(1:5, 1:5) - 2.5) < 1e-6) 1578 1579 unlink(jsonPath2) 1580 unlink(jsonPath3) 1581}) 1582 1583test_that("pivot GroupedData column", { 1584 df <- createDataFrame(data.frame( 1585 earnings = c(10000, 10000, 11000, 15000, 12000, 20000, 21000, 22000), 1586 course = c("R", "Python", "R", "Python", "R", "Python", "R", "Python"), 1587 year = c(2013, 2013, 2014, 2014, 2015, 2015, 2016, 2016) 1588 )) 1589 sum1 <- collect(sum(pivot(groupBy(df, "year"), "course"), "earnings")) 1590 sum2 <- collect(sum(pivot(groupBy(df, "year"), "course", c("Python", "R")), "earnings")) 1591 sum3 <- collect(sum(pivot(groupBy(df, "year"), "course", list("Python", "R")), "earnings")) 1592 sum4 <- collect(sum(pivot(groupBy(df, "year"), "course", "R"), "earnings")) 1593 1594 correct_answer <- data.frame( 1595 year = c(2013, 2014, 2015, 2016), 1596 Python = c(10000, 15000, 20000, 22000), 1597 R = c(10000, 11000, 12000, 21000) 1598 ) 1599 expect_equal(sum1, correct_answer) 1600 expect_equal(sum2, correct_answer) 1601 expect_equal(sum3, correct_answer) 1602 expect_equal(sum4, correct_answer[, c("year", "R")]) 1603 1604 expect_error(collect(sum(pivot(groupBy(df, "year"), "course", c("R", "R")), "earnings"))) 1605 expect_error(collect(sum(pivot(groupBy(df, "year"), "course", list("R", "R")), "earnings"))) 1606}) 1607 1608test_that("arrange() and orderBy() on a DataFrame", { 1609 df <- read.json(jsonPath) 1610 sorted <- arrange(df, df$age) 1611 expect_equal(collect(sorted)[1, 2], "Michael") 1612 1613 sorted2 <- arrange(df, "name", decreasing = FALSE) 1614 expect_equal(collect(sorted2)[2, "age"], 19) 1615 1616 sorted3 <- orderBy(df, asc(df$age)) 1617 expect_true(is.na(first(sorted3)$age)) 1618 expect_equal(collect(sorted3)[2, "age"], 19) 1619 1620 sorted4 <- orderBy(df, desc(df$name)) 1621 expect_equal(first(sorted4)$name, "Michael") 1622 expect_equal(collect(sorted4)[3, "name"], "Andy") 1623 1624 sorted5 <- arrange(df, "age", "name", decreasing = TRUE) 1625 expect_equal(collect(sorted5)[1, 2], "Andy") 1626 1627 sorted6 <- arrange(df, "age", "name", decreasing = c(T, F)) 1628 expect_equal(collect(sorted6)[1, 2], "Andy") 1629 1630 sorted7 <- arrange(df, "name", decreasing = FALSE) 1631 expect_equal(collect(sorted7)[2, "age"], 19) 1632}) 1633 1634test_that("filter() on a DataFrame", { 1635 df <- read.json(jsonPath) 1636 filtered <- filter(df, "age > 20") 1637 expect_equal(count(filtered), 1) 1638 expect_equal(collect(filtered)$name, "Andy") 1639 filtered2 <- where(df, df$name != "Michael") 1640 expect_equal(count(filtered2), 2) 1641 expect_equal(collect(filtered2)$age[2], 19) 1642 1643 # test suites for %in% 1644 filtered3 <- filter(df, "age in (19)") 1645 expect_equal(count(filtered3), 1) 1646 filtered4 <- filter(df, "age in (19, 30)") 1647 expect_equal(count(filtered4), 2) 1648 filtered5 <- where(df, df$age %in% c(19)) 1649 expect_equal(count(filtered5), 1) 1650 filtered6 <- where(df, df$age %in% c(19, 30)) 1651 expect_equal(count(filtered6), 2) 1652 1653 # Test stats::filter is working 1654 #expect_true(is.ts(filter(1:100, rep(1, 3)))) # nolint 1655}) 1656 1657test_that("join(), crossJoin() and merge() on a DataFrame", { 1658 df <- read.json(jsonPath) 1659 1660 mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}", 1661 "{\"name\":\"Andy\", \"test\": \"no\"}", 1662 "{\"name\":\"Justin\", \"test\": \"yes\"}", 1663 "{\"name\":\"Bob\", \"test\": \"yes\"}") 1664 jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 1665 writeLines(mockLines2, jsonPath2) 1666 df2 <- read.json(jsonPath2) 1667 1668 # inner join, not cartesian join 1669 expect_equal(count(where(join(df, df2), df$name == df2$name)), 3) 1670 # cartesian join 1671 expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }), 1672 paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for", 1673 " INNER join between logical plans).*")) 1674 1675 joined <- crossJoin(df, df2) 1676 expect_equal(names(joined), c("age", "name", "name", "test")) 1677 expect_equal(count(joined), 12) 1678 expect_equal(names(collect(joined)), c("age", "name", "name", "test")) 1679 1680 joined2 <- join(df, df2, df$name == df2$name) 1681 expect_equal(names(joined2), c("age", "name", "name", "test")) 1682 expect_equal(count(joined2), 3) 1683 1684 joined3 <- join(df, df2, df$name == df2$name, "rightouter") 1685 expect_equal(names(joined3), c("age", "name", "name", "test")) 1686 expect_equal(count(joined3), 4) 1687 expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2])) 1688 1689 joined4 <- select(join(df, df2, df$name == df2$name, "outer"), 1690 alias(df$age + 5, "newAge"), df$name, df2$test) 1691 expect_equal(names(joined4), c("newAge", "name", "test")) 1692 expect_equal(count(joined4), 4) 1693 expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24) 1694 1695 joined5 <- join(df, df2, df$name == df2$name, "leftouter") 1696 expect_equal(names(joined5), c("age", "name", "name", "test")) 1697 expect_equal(count(joined5), 3) 1698 expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1])) 1699 1700 joined6 <- join(df, df2, df$name == df2$name, "inner") 1701 expect_equal(names(joined6), c("age", "name", "name", "test")) 1702 expect_equal(count(joined6), 3) 1703 1704 joined7 <- join(df, df2, df$name == df2$name, "leftsemi") 1705 expect_equal(names(joined7), c("age", "name")) 1706 expect_equal(count(joined7), 3) 1707 1708 joined8 <- join(df, df2, df$name == df2$name, "left_outer") 1709 expect_equal(names(joined8), c("age", "name", "name", "test")) 1710 expect_equal(count(joined8), 3) 1711 expect_true(is.na(collect(orderBy(joined8, joined8$age))$age[1])) 1712 1713 joined9 <- join(df, df2, df$name == df2$name, "right_outer") 1714 expect_equal(names(joined9), c("age", "name", "name", "test")) 1715 expect_equal(count(joined9), 4) 1716 expect_true(is.na(collect(orderBy(joined9, joined9$age))$age[2])) 1717 1718 merged <- merge(df, df2, by.x = "name", by.y = "name", all.x = TRUE, all.y = TRUE) 1719 expect_equal(count(merged), 4) 1720 expect_equal(names(merged), c("age", "name_x", "name_y", "test")) 1721 expect_equal(collect(orderBy(merged, merged$name_x))$age[3], 19) 1722 1723 merged <- merge(df, df2, suffixes = c("-X", "-Y")) 1724 expect_equal(count(merged), 3) 1725 expect_equal(names(merged), c("age", "name-X", "name-Y", "test")) 1726 expect_equal(collect(orderBy(merged, merged$"name-X"))$age[1], 30) 1727 1728 merged <- merge(df, df2, by = "name", suffixes = c("-X", "-Y"), sort = FALSE) 1729 expect_equal(count(merged), 3) 1730 expect_equal(names(merged), c("age", "name-X", "name-Y", "test")) 1731 expect_equal(collect(orderBy(merged, merged$"name-Y"))$"name-X"[3], "Michael") 1732 1733 merged <- merge(df, df2, by = "name", all = T, sort = T) 1734 expect_equal(count(merged), 4) 1735 expect_equal(names(merged), c("age", "name_x", "name_y", "test")) 1736 expect_equal(collect(orderBy(merged, merged$"name_y"))$"name_x"[1], "Andy") 1737 1738 merged <- merge(df, df2, by = NULL) 1739 expect_equal(count(merged), 12) 1740 expect_equal(names(merged), c("age", "name", "name", "test")) 1741 1742 mockLines3 <- c("{\"name\":\"Michael\", \"name_y\":\"Michael\", \"test\": \"yes\"}", 1743 "{\"name\":\"Andy\", \"name_y\":\"Andy\", \"test\": \"no\"}", 1744 "{\"name\":\"Justin\", \"name_y\":\"Justin\", \"test\": \"yes\"}", 1745 "{\"name\":\"Bob\", \"name_y\":\"Bob\", \"test\": \"yes\"}") 1746 jsonPath3 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 1747 writeLines(mockLines3, jsonPath3) 1748 df3 <- read.json(jsonPath3) 1749 expect_error(merge(df, df3), 1750 paste("The following column name: name_y occurs more than once in the 'DataFrame'.", 1751 "Please use different suffixes for the intersected columns.", sep = "")) 1752 1753 unlink(jsonPath2) 1754 unlink(jsonPath3) 1755}) 1756 1757test_that("toJSON() returns an RDD of the correct values", { 1758 df <- read.json(jsonPath) 1759 testRDD <- toJSON(df) 1760 expect_is(testRDD, "RDD") 1761 expect_equal(getSerializedMode(testRDD), "string") 1762 expect_equal(collectRDD(testRDD)[[1]], mockLines[1]) 1763}) 1764 1765test_that("showDF()", { 1766 df <- read.json(jsonPath) 1767 expected <- paste("+----+-------+\n", 1768 "| age| name|\n", 1769 "+----+-------+\n", 1770 "|null|Michael|\n", 1771 "| 30| Andy|\n", 1772 "| 19| Justin|\n", 1773 "+----+-------+\n", sep = "") 1774 expected2 <- paste("+---+----+\n", 1775 "|age|name|\n", 1776 "+---+----+\n", 1777 "|nul| Mic|\n", 1778 "| 30| And|\n", 1779 "| 19| Jus|\n", 1780 "+---+----+\n", sep = "") 1781 expect_output(showDF(df), expected) 1782 expect_output(showDF(df, truncate = 3), expected2) 1783}) 1784 1785test_that("isLocal()", { 1786 df <- read.json(jsonPath) 1787 expect_false(isLocal(df)) 1788}) 1789 1790test_that("union(), rbind(), except(), and intersect() on a DataFrame", { 1791 df <- read.json(jsonPath) 1792 1793 lines <- c("{\"name\":\"Bob\", \"age\":24}", 1794 "{\"name\":\"Andy\", \"age\":30}", 1795 "{\"name\":\"James\", \"age\":35}") 1796 jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") 1797 writeLines(lines, jsonPath2) 1798 df2 <- read.df(jsonPath2, "json") 1799 1800 unioned <- arrange(union(df, df2), df$age) 1801 expect_is(unioned, "SparkDataFrame") 1802 expect_equal(count(unioned), 6) 1803 expect_equal(first(unioned)$name, "Michael") 1804 expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6) 1805 1806 unioned2 <- arrange(rbind(unioned, df, df2), df$age) 1807 expect_is(unioned2, "SparkDataFrame") 1808 expect_equal(count(unioned2), 12) 1809 expect_equal(first(unioned2)$name, "Michael") 1810 1811 excepted <- arrange(except(df, df2), desc(df$age)) 1812 expect_is(unioned, "SparkDataFrame") 1813 expect_equal(count(excepted), 2) 1814 expect_equal(first(excepted)$name, "Justin") 1815 1816 intersected <- arrange(intersect(df, df2), df$age) 1817 expect_is(unioned, "SparkDataFrame") 1818 expect_equal(count(intersected), 1) 1819 expect_equal(first(intersected)$name, "Andy") 1820 1821 # Test base::union is working 1822 expect_equal(union(c(1:3), c(3:5)), c(1:5)) 1823 1824 # Test base::rbind is working 1825 expect_equal(length(rbind(1:4, c = 2, a = 10, 10, deparse.level = 0)), 16) 1826 1827 # Test base::intersect is working 1828 expect_equal(length(intersect(1:20, 3:23)), 18) 1829 1830 unlink(jsonPath2) 1831}) 1832 1833test_that("withColumn() and withColumnRenamed()", { 1834 df <- read.json(jsonPath) 1835 newDF <- withColumn(df, "newAge", df$age + 2) 1836 expect_equal(length(columns(newDF)), 3) 1837 expect_equal(columns(newDF)[3], "newAge") 1838 expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32) 1839 1840 # Replace existing column 1841 newDF <- withColumn(df, "age", df$age + 2) 1842 expect_equal(length(columns(newDF)), 2) 1843 expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32) 1844 1845 newDF <- withColumn(df, "age", 18) 1846 expect_equal(length(columns(newDF)), 2) 1847 expect_equal(first(newDF)$age, 18) 1848 1849 expect_error(withColumn(df, "age", list("a")), 1850 "Literal value must be atomic in length of 1") 1851 1852 newDF2 <- withColumnRenamed(df, "age", "newerAge") 1853 expect_equal(length(columns(newDF2)), 2) 1854 expect_equal(columns(newDF2)[1], "newerAge") 1855}) 1856 1857test_that("mutate(), transform(), rename() and names()", { 1858 df <- read.json(jsonPath) 1859 newDF <- mutate(df, newAge = df$age + 2) 1860 expect_equal(length(columns(newDF)), 3) 1861 expect_equal(columns(newDF)[3], "newAge") 1862 expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32) 1863 1864 newDF <- mutate(df, age = df$age + 2, newAge = df$age + 3) 1865 expect_equal(length(columns(newDF)), 3) 1866 expect_equal(columns(newDF)[3], "newAge") 1867 expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 33) 1868 expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32) 1869 1870 newDF <- mutate(df, age = df$age + 2, newAge = df$age + 3, 1871 age = df$age + 4, newAge = df$age + 5) 1872 expect_equal(length(columns(newDF)), 3) 1873 expect_equal(columns(newDF)[3], "newAge") 1874 expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 35) 1875 expect_equal(first(filter(newDF, df$name != "Michael"))$age, 34) 1876 1877 newDF <- mutate(df, df$age + 3) 1878 expect_equal(length(columns(newDF)), 3) 1879 expect_equal(columns(newDF)[[3]], "df$age + 3") 1880 expect_equal(first(filter(newDF, df$name != "Michael"))[[3]], 33) 1881 1882 newDF2 <- rename(df, newerAge = df$age) 1883 expect_equal(length(columns(newDF2)), 2) 1884 expect_equal(columns(newDF2)[1], "newerAge") 1885 1886 names(newDF2) <- c("newerName", "evenNewerAge") 1887 expect_equal(length(names(newDF2)), 2) 1888 expect_equal(names(newDF2)[1], "newerName") 1889 1890 transformedDF <- transform(df, newAge = -df$age, newAge2 = df$age / 2) 1891 expect_equal(length(columns(transformedDF)), 4) 1892 expect_equal(columns(transformedDF)[3], "newAge") 1893 expect_equal(columns(transformedDF)[4], "newAge2") 1894 expect_equal(first(filter(transformedDF, transformedDF$name == "Andy"))$newAge, -30) 1895 1896 # test if base::transform on local data frames works 1897 # ensure the proper signature is used - otherwise this will fail to run 1898 attach(airquality) 1899 result <- transform(Ozone, logOzone = log(Ozone)) 1900 expect_equal(nrow(result), 153) 1901 expect_equal(ncol(result), 2) 1902 detach(airquality) 1903}) 1904 1905test_that("read/write ORC files", { 1906 setHiveContext(sc) 1907 df <- read.df(jsonPath, "json") 1908 1909 # Test write.df and read.df 1910 write.df(df, orcPath, "orc", mode = "overwrite") 1911 df2 <- read.df(orcPath, "orc") 1912 expect_is(df2, "SparkDataFrame") 1913 expect_equal(count(df), count(df2)) 1914 1915 # Test write.orc and read.orc 1916 orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc") 1917 write.orc(df, orcPath2) 1918 orcDF <- read.orc(orcPath2) 1919 expect_is(orcDF, "SparkDataFrame") 1920 expect_equal(count(orcDF), count(df)) 1921 1922 unlink(orcPath2) 1923 unsetHiveContext() 1924}) 1925 1926test_that("read/write ORC files - compression option", { 1927 setHiveContext(sc) 1928 df <- read.df(jsonPath, "json") 1929 1930 orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc") 1931 write.orc(df, orcPath2, compression = "ZLIB") 1932 orcDF <- read.orc(orcPath2) 1933 expect_is(orcDF, "SparkDataFrame") 1934 expect_equal(count(orcDF), count(df)) 1935 expect_true(length(list.files(orcPath2, pattern = ".zlib.orc")) > 0) 1936 1937 unlink(orcPath2) 1938 unsetHiveContext() 1939}) 1940 1941test_that("read/write Parquet files", { 1942 df <- read.df(jsonPath, "json") 1943 # Test write.df and read.df 1944 write.df(df, parquetPath, "parquet", mode = "overwrite") 1945 df2 <- read.df(parquetPath, "parquet") 1946 expect_is(df2, "SparkDataFrame") 1947 expect_equal(count(df2), 3) 1948 1949 # Test write.parquet/saveAsParquetFile and read.parquet/parquetFile 1950 parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") 1951 write.parquet(df, parquetPath2) 1952 parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet") 1953 suppressWarnings(saveAsParquetFile(df, parquetPath3)) 1954 parquetDF <- read.parquet(c(parquetPath2, parquetPath3)) 1955 expect_is(parquetDF, "SparkDataFrame") 1956 expect_equal(count(parquetDF), count(df) * 2) 1957 parquetDF2 <- suppressWarnings(parquetFile(parquetPath2, parquetPath3)) 1958 expect_is(parquetDF2, "SparkDataFrame") 1959 expect_equal(count(parquetDF2), count(df) * 2) 1960 1961 # Test if varargs works with variables 1962 saveMode <- "overwrite" 1963 mergeSchema <- "true" 1964 parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet") 1965 write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema) 1966 1967 unlink(parquetPath2) 1968 unlink(parquetPath3) 1969 unlink(parquetPath4) 1970}) 1971 1972test_that("read/write Parquet files - compression option/mode", { 1973 df <- read.df(jsonPath, "json") 1974 tempPath <- tempfile(pattern = "tempPath", fileext = ".parquet") 1975 1976 # Test write.df and read.df 1977 write.parquet(df, tempPath, compression = "GZIP") 1978 df2 <- read.parquet(tempPath) 1979 expect_is(df2, "SparkDataFrame") 1980 expect_equal(count(df2), 3) 1981 expect_true(length(list.files(tempPath, pattern = ".gz.parquet")) > 0) 1982 1983 write.parquet(df, tempPath, mode = "overwrite") 1984 df3 <- read.parquet(tempPath) 1985 expect_is(df3, "SparkDataFrame") 1986 expect_equal(count(df3), 3) 1987}) 1988 1989test_that("read/write text files", { 1990 # Test write.df and read.df 1991 df <- read.df(jsonPath, "text") 1992 expect_is(df, "SparkDataFrame") 1993 expect_equal(colnames(df), c("value")) 1994 expect_equal(count(df), 3) 1995 textPath <- tempfile(pattern = "textPath", fileext = ".txt") 1996 write.df(df, textPath, "text", mode = "overwrite") 1997 1998 # Test write.text and read.text 1999 textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt") 2000 write.text(df, textPath2) 2001 df2 <- read.text(c(textPath, textPath2)) 2002 expect_is(df2, "SparkDataFrame") 2003 expect_equal(colnames(df2), c("value")) 2004 expect_equal(count(df2), count(df) * 2) 2005 2006 unlink(textPath) 2007 unlink(textPath2) 2008}) 2009 2010test_that("read/write text files - compression option", { 2011 df <- read.df(jsonPath, "text") 2012 2013 textPath <- tempfile(pattern = "textPath", fileext = ".txt") 2014 write.text(df, textPath, compression = "GZIP") 2015 textDF <- read.text(textPath) 2016 expect_is(textDF, "SparkDataFrame") 2017 expect_equal(count(textDF), count(df)) 2018 expect_true(length(list.files(textPath, pattern = ".gz")) > 0) 2019 2020 unlink(textPath) 2021}) 2022 2023test_that("describe() and summarize() on a DataFrame", { 2024 df <- read.json(jsonPath) 2025 stats <- describe(df, "age") 2026 expect_equal(collect(stats)[1, "summary"], "count") 2027 expect_equal(collect(stats)[2, "age"], "24.5") 2028 expect_equal(collect(stats)[3, "age"], "7.7781745930520225") 2029 stats <- describe(df) 2030 expect_equal(collect(stats)[4, "summary"], "min") 2031 expect_equal(collect(stats)[5, "age"], "30") 2032 2033 stats2 <- summary(df) 2034 expect_equal(collect(stats2)[4, "summary"], "min") 2035 expect_equal(collect(stats2)[5, "age"], "30") 2036 2037 # SPARK-16425: SparkR summary() fails on column of type logical 2038 df <- withColumn(df, "boolean", df$age == 30) 2039 summary(df) 2040 2041 # Test base::summary is working 2042 expect_equal(length(summary(attenu, digits = 4)), 35) 2043}) 2044 2045test_that("dropna() and na.omit() on a DataFrame", { 2046 df <- read.json(jsonPathNa) 2047 rows <- collect(df) 2048 2049 # drop with columns 2050 2051 expected <- rows[!is.na(rows$name), ] 2052 actual <- collect(dropna(df, cols = "name")) 2053 expect_identical(expected, actual) 2054 actual <- collect(na.omit(df, cols = "name")) 2055 expect_identical(expected, actual) 2056 2057 expected <- rows[!is.na(rows$age), ] 2058 actual <- collect(dropna(df, cols = "age")) 2059 row.names(expected) <- row.names(actual) 2060 # identical on two dataframes does not work here. Don't know why. 2061 # use identical on all columns as a workaround. 2062 expect_identical(expected$age, actual$age) 2063 expect_identical(expected$height, actual$height) 2064 expect_identical(expected$name, actual$name) 2065 actual <- collect(na.omit(df, cols = "age")) 2066 2067 expected <- rows[!is.na(rows$age) & !is.na(rows$height), ] 2068 actual <- collect(dropna(df, cols = c("age", "height"))) 2069 expect_identical(expected, actual) 2070 actual <- collect(na.omit(df, cols = c("age", "height"))) 2071 expect_identical(expected, actual) 2072 2073 expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name), ] 2074 actual <- collect(dropna(df)) 2075 expect_identical(expected, actual) 2076 actual <- collect(na.omit(df)) 2077 expect_identical(expected, actual) 2078 2079 # drop with how 2080 2081 expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name), ] 2082 actual <- collect(dropna(df)) 2083 expect_identical(expected, actual) 2084 actual <- collect(na.omit(df)) 2085 expect_identical(expected, actual) 2086 2087 expected <- rows[!is.na(rows$age) | !is.na(rows$height) | !is.na(rows$name), ] 2088 actual <- collect(dropna(df, "all")) 2089 expect_identical(expected, actual) 2090 actual <- collect(na.omit(df, "all")) 2091 expect_identical(expected, actual) 2092 2093 expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name), ] 2094 actual <- collect(dropna(df, "any")) 2095 expect_identical(expected, actual) 2096 actual <- collect(na.omit(df, "any")) 2097 expect_identical(expected, actual) 2098 2099 expected <- rows[!is.na(rows$age) & !is.na(rows$height), ] 2100 actual <- collect(dropna(df, "any", cols = c("age", "height"))) 2101 expect_identical(expected, actual) 2102 actual <- collect(na.omit(df, "any", cols = c("age", "height"))) 2103 expect_identical(expected, actual) 2104 2105 expected <- rows[!is.na(rows$age) | !is.na(rows$height), ] 2106 actual <- collect(dropna(df, "all", cols = c("age", "height"))) 2107 expect_identical(expected, actual) 2108 actual <- collect(na.omit(df, "all", cols = c("age", "height"))) 2109 expect_identical(expected, actual) 2110 2111 # drop with threshold 2112 2113 expected <- rows[as.integer(!is.na(rows$age)) + as.integer(!is.na(rows$height)) >= 2, ] 2114 actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height"))) 2115 expect_identical(expected, actual) 2116 actual <- collect(na.omit(df, minNonNulls = 2, cols = c("age", "height"))) 2117 expect_identical(expected, actual) 2118 2119 expected <- rows[as.integer(!is.na(rows$age)) + 2120 as.integer(!is.na(rows$height)) + 2121 as.integer(!is.na(rows$name)) >= 3, ] 2122 actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height"))) 2123 expect_identical(expected, actual) 2124 actual <- collect(na.omit(df, minNonNulls = 3, cols = c("name", "age", "height"))) 2125 expect_identical(expected, actual) 2126 2127 # Test stats::na.omit is working 2128 expect_equal(nrow(na.omit(data.frame(x = c(0, 10, NA)))), 2) 2129}) 2130 2131test_that("fillna() on a DataFrame", { 2132 df <- read.json(jsonPathNa) 2133 rows <- collect(df) 2134 2135 # fill with value 2136 2137 expected <- rows 2138 expected$age[is.na(expected$age)] <- 50 2139 expected$height[is.na(expected$height)] <- 50.6 2140 actual <- collect(fillna(df, 50.6)) 2141 expect_identical(expected, actual) 2142 2143 expected <- rows 2144 expected$name[is.na(expected$name)] <- "unknown" 2145 actual <- collect(fillna(df, "unknown")) 2146 expect_identical(expected, actual) 2147 2148 expected <- rows 2149 expected$age[is.na(expected$age)] <- 50 2150 actual <- collect(fillna(df, 50.6, "age")) 2151 expect_identical(expected, actual) 2152 2153 expected <- rows 2154 expected$name[is.na(expected$name)] <- "unknown" 2155 actual <- collect(fillna(df, "unknown", c("age", "name"))) 2156 expect_identical(expected, actual) 2157 2158 # fill with named list 2159 2160 expected <- rows 2161 expected$age[is.na(expected$age)] <- 50 2162 expected$height[is.na(expected$height)] <- 50.6 2163 expected$name[is.na(expected$name)] <- "unknown" 2164 actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown"))) 2165 expect_identical(expected, actual) 2166}) 2167 2168test_that("crosstab() on a DataFrame", { 2169 rdd <- lapply(parallelize(sc, 0:3), function(x) { 2170 list(paste0("a", x %% 3), paste0("b", x %% 2)) 2171 }) 2172 df <- toDF(rdd, list("a", "b")) 2173 ct <- crosstab(df, "a", "b") 2174 ordered <- ct[order(ct$a_b), ] 2175 row.names(ordered) <- NULL 2176 expected <- data.frame("a_b" = c("a0", "a1", "a2"), "b0" = c(1, 0, 1), "b1" = c(1, 1, 0), 2177 stringsAsFactors = FALSE, row.names = NULL) 2178 expect_identical(expected, ordered) 2179}) 2180 2181test_that("cov() and corr() on a DataFrame", { 2182 l <- lapply(c(0:9), function(x) { list(x, x * 2.0) }) 2183 df <- createDataFrame(l, c("singles", "doubles")) 2184 result <- cov(df, "singles", "doubles") 2185 expect_true(abs(result - 55.0 / 3) < 1e-12) 2186 2187 result <- corr(df, "singles", "doubles") 2188 expect_true(abs(result - 1.0) < 1e-12) 2189 result <- corr(df, "singles", "doubles", "pearson") 2190 expect_true(abs(result - 1.0) < 1e-12) 2191 2192 # Test stats::cov is working 2193 #expect_true(abs(max(cov(swiss)) - 1739.295) < 1e-3) # nolint 2194}) 2195 2196test_that("freqItems() on a DataFrame", { 2197 input <- 1:1000 2198 rdf <- data.frame(numbers = input, letters = as.character(input), 2199 negDoubles = input * -1.0, stringsAsFactors = F) 2200 rdf[ input %% 3 == 0, ] <- c(1, "1", -1) 2201 df <- createDataFrame(rdf) 2202 multiColResults <- freqItems(df, c("numbers", "letters"), support = 0.1) 2203 expect_true(1 %in% multiColResults$numbers[[1]]) 2204 expect_true("1" %in% multiColResults$letters[[1]]) 2205 singleColResult <- freqItems(df, "negDoubles", support = 0.1) 2206 expect_true(-1 %in% head(singleColResult$negDoubles)[[1]]) 2207 2208 l <- lapply(c(0:99), function(i) { 2209 if (i %% 2 == 0) { list(1L, -1.0) } 2210 else { list(i, i * -1.0) }}) 2211 df <- createDataFrame(l, c("a", "b")) 2212 result <- freqItems(df, c("a", "b"), 0.4) 2213 expect_identical(result[[1]], list(list(1L, 99L))) 2214 expect_identical(result[[2]], list(list(-1, -99))) 2215}) 2216 2217test_that("sampleBy() on a DataFrame", { 2218 l <- lapply(c(0:99), function(i) { as.character(i %% 3) }) 2219 df <- createDataFrame(l, "key") 2220 fractions <- list("0" = 0.1, "1" = 0.2) 2221 sample <- sampleBy(df, "key", fractions, 0) 2222 result <- collect(orderBy(count(groupBy(sample, "key")), "key")) 2223 expect_identical(as.list(result[1, ]), list(key = "0", count = 3)) 2224 expect_identical(as.list(result[2, ]), list(key = "1", count = 7)) 2225}) 2226 2227test_that("approxQuantile() on a DataFrame", { 2228 l <- lapply(c(0:99), function(i) { i }) 2229 df <- createDataFrame(l, "key") 2230 quantiles <- approxQuantile(df, "key", c(0.5, 0.8), 0.0) 2231 expect_equal(quantiles[[1]], 50) 2232 expect_equal(quantiles[[2]], 80) 2233}) 2234 2235test_that("SQL error message is returned from JVM", { 2236 retError <- tryCatch(sql("select * from blah"), error = function(e) e) 2237 expect_equal(grepl("Table or view not found", retError), TRUE) 2238 expect_equal(grepl("blah", retError), TRUE) 2239}) 2240 2241irisDF <- suppressWarnings(createDataFrame(iris)) 2242 2243test_that("Method as.data.frame as a synonym for collect()", { 2244 expect_equal(as.data.frame(irisDF), collect(irisDF)) 2245 irisDF2 <- irisDF[irisDF$Species == "setosa", ] 2246 expect_equal(as.data.frame(irisDF2), collect(irisDF2)) 2247 2248 # Make sure as.data.frame in the R base package is not covered 2249 expect_error(as.data.frame(c(1, 2)), NA) 2250}) 2251 2252test_that("attach() on a DataFrame", { 2253 df <- read.json(jsonPath) 2254 expect_error(age) 2255 attach(df) 2256 expect_is(age, "SparkDataFrame") 2257 expected_age <- data.frame(age = c(NA, 30, 19)) 2258 expect_equal(head(age), expected_age) 2259 stat <- summary(age) 2260 expect_equal(collect(stat)[5, "age"], "30") 2261 age <- age$age + 1 2262 expect_is(age, "Column") 2263 rm(age) 2264 stat2 <- summary(age) 2265 expect_equal(collect(stat2)[5, "age"], "30") 2266 detach("df") 2267 stat3 <- summary(df[, "age", drop = F]) 2268 expect_equal(collect(stat3)[5, "age"], "30") 2269 expect_error(age) 2270}) 2271 2272test_that("with() on a DataFrame", { 2273 df <- suppressWarnings(createDataFrame(iris)) 2274 expect_error(Sepal_Length) 2275 sum1 <- with(df, list(summary(Sepal_Length), summary(Sepal_Width))) 2276 expect_equal(collect(sum1[[1]])[1, "Sepal_Length"], "150") 2277 sum2 <- with(df, distinct(Sepal_Length)) 2278 expect_equal(nrow(sum2), 35) 2279}) 2280 2281test_that("Method coltypes() to get and set R's data types of a DataFrame", { 2282 expect_equal(coltypes(irisDF), c(rep("numeric", 4), "character")) 2283 2284 data <- data.frame(c1 = c(1, 2, 3), 2285 c2 = c(T, F, T), 2286 c3 = c("2015/01/01 10:00:00", "2015/01/02 10:00:00", "2015/01/03 10:00:00")) 2287 2288 schema <- structType(structField("c1", "byte"), 2289 structField("c3", "boolean"), 2290 structField("c4", "timestamp")) 2291 2292 # Test primitive types 2293 DF <- createDataFrame(data, schema) 2294 expect_equal(coltypes(DF), c("integer", "logical", "POSIXct")) 2295 createOrReplaceTempView(DF, "DFView") 2296 sqlCast <- sql("select cast('2' as decimal) as x from DFView limit 1") 2297 expect_equal(coltypes(sqlCast), "numeric") 2298 2299 # Test complex types 2300 x <- createDataFrame(list(list(as.environment( 2301 list("a" = "b", "c" = "d", "e" = "f"))))) 2302 expect_equal(coltypes(x), "map<string,string>") 2303 2304 df <- selectExpr(read.json(jsonPath), "name", "(age * 1.21) as age") 2305 expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)"))) 2306 2307 df1 <- select(df, cast(df$age, "integer")) 2308 coltypes(df) <- c("character", "integer") 2309 expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"))) 2310 value <- collect(df[, 2, drop = F])[[3, 1]] 2311 expect_equal(value, collect(df1)[[3, 1]]) 2312 expect_equal(value, 22) 2313 2314 coltypes(df) <- c(NA, "numeric") 2315 expect_equal(dtypes(df), list(c("name", "string"), c("age", "double"))) 2316 2317 expect_error(coltypes(df) <- c("character"), 2318 "Length of type vector should match the number of columns for SparkDataFrame") 2319 expect_error(coltypes(df) <- c("environment", "list"), 2320 "Only atomic type is supported for column types") 2321}) 2322 2323test_that("Method str()", { 2324 # Structure of Iris 2325 iris2 <- iris 2326 colnames(iris2) <- c("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") 2327 iris2$col <- TRUE 2328 irisDF2 <- createDataFrame(iris2) 2329 2330 out <- capture.output(str(irisDF2)) 2331 expect_equal(length(out), 7) 2332 expect_equal(out[1], "'SparkDataFrame': 6 variables:") 2333 expect_equal(out[2], " $ Sepal_Length: num 5.1 4.9 4.7 4.6 5 5.4") 2334 expect_equal(out[3], " $ Sepal_Width : num 3.5 3 3.2 3.1 3.6 3.9") 2335 expect_equal(out[4], " $ Petal_Length: num 1.4 1.4 1.3 1.5 1.4 1.7") 2336 expect_equal(out[5], " $ Petal_Width : num 0.2 0.2 0.2 0.2 0.2 0.4") 2337 expect_equal(out[6], paste0(" $ Species : chr \"setosa\" \"setosa\" \"", 2338 "setosa\" \"setosa\" \"setosa\" \"setosa\"")) 2339 expect_equal(out[7], " $ col : logi TRUE TRUE TRUE TRUE TRUE TRUE") 2340 2341 createOrReplaceTempView(irisDF2, "irisView") 2342 2343 sqlCast <- sql("select cast('2' as decimal) as x from irisView limit 1") 2344 castStr <- capture.output(str(sqlCast)) 2345 expect_equal(length(castStr), 2) 2346 expect_equal(castStr[1], "'SparkDataFrame': 1 variables:") 2347 expect_equal(castStr[2], " $ x: num 2") 2348 2349 # A random dataset with many columns. This test is to check str limits 2350 # the number of columns. Therefore, it will suffice to check for the 2351 # number of returned rows 2352 x <- runif(200, 1, 10) 2353 df <- data.frame(t(as.matrix(data.frame(x, x, x, x, x, x, x, x, x)))) 2354 DF <- createDataFrame(df) 2355 out <- capture.output(str(DF)) 2356 expect_equal(length(out), 103) 2357 2358 # Test utils:::str 2359 expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris))) 2360}) 2361 2362test_that("Histogram", { 2363 2364 # Basic histogram test with colname 2365 expect_equal( 2366 all(histogram(irisDF, "Petal_Width", 8) == 2367 data.frame(bins = seq(0, 7), 2368 counts = c(48, 2, 7, 21, 24, 19, 15, 14), 2369 centroids = seq(0, 7) * 0.3 + 0.25)), 2370 TRUE) 2371 2372 # Basic histogram test with Column 2373 expect_equal( 2374 all(histogram(irisDF, irisDF$Petal_Width, 8) == 2375 data.frame(bins = seq(0, 7), 2376 counts = c(48, 2, 7, 21, 24, 19, 15, 14), 2377 centroids = seq(0, 7) * 0.3 + 0.25)), 2378 TRUE) 2379 2380 # Basic histogram test with derived column 2381 expect_equal( 2382 all(round(histogram(irisDF, irisDF$Petal_Width + 1, 8), 2) == 2383 data.frame(bins = seq(0, 7), 2384 counts = c(48, 2, 7, 21, 24, 19, 15, 14), 2385 centroids = seq(0, 7) * 0.3 + 1.25)), 2386 TRUE) 2387 2388 # Missing nbins 2389 expect_equal(length(histogram(irisDF, "Petal_Width")$counts), 10) 2390 2391 # Wrong colname 2392 expect_error(histogram(irisDF, "xxx"), 2393 "Specified colname does not belong to the given SparkDataFrame.") 2394 2395 # Invalid nbins 2396 expect_error(histogram(irisDF, "Petal_Width", nbins = 0), 2397 "The number of bins must be a positive integer number greater than 1.") 2398 2399 # Test against R's hist 2400 expect_equal(all(hist(iris$Sepal.Width)$counts == 2401 histogram(irisDF, "Sepal_Width", 12)$counts), T) 2402 2403 # Test when there are zero counts 2404 df <- as.DataFrame(data.frame(x = c(1, 2, 3, 4, 100))) 2405 expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1)) 2406}) 2407 2408test_that("dapply() and dapplyCollect() on a DataFrame", { 2409 df <- createDataFrame( 2410 list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), 2411 c("a", "b", "c")) 2412 ldf <- collect(df) 2413 df1 <- dapply(df, function(x) { x }, schema(df)) 2414 result <- collect(df1) 2415 expect_identical(ldf, result) 2416 2417 result <- dapplyCollect(df, function(x) { x }) 2418 expect_identical(ldf, result) 2419 2420 # Filter and add a column 2421 schema <- structType(structField("a", "integer"), structField("b", "double"), 2422 structField("c", "string"), structField("d", "integer")) 2423 df1 <- dapply( 2424 df, 2425 function(x) { 2426 y <- x[x$a > 1, ] 2427 y <- cbind(y, y$a + 1L) 2428 }, 2429 schema) 2430 result <- collect(df1) 2431 expected <- ldf[ldf$a > 1, ] 2432 expected$d <- expected$a + 1L 2433 rownames(expected) <- NULL 2434 expect_identical(expected, result) 2435 2436 result <- dapplyCollect( 2437 df, 2438 function(x) { 2439 y <- x[x$a > 1, ] 2440 y <- cbind(y, y$a + 1L) 2441 }) 2442 expected1 <- expected 2443 names(expected1) <- names(result) 2444 expect_identical(expected1, result) 2445 2446 # Remove the added column 2447 df2 <- dapply( 2448 df1, 2449 function(x) { 2450 x[, c("a", "b", "c")] 2451 }, 2452 schema(df)) 2453 result <- collect(df2) 2454 expected <- expected[, c("a", "b", "c")] 2455 expect_identical(expected, result) 2456 2457 result <- dapplyCollect( 2458 df1, 2459 function(x) { 2460 x[, c("a", "b", "c")] 2461 }) 2462 expect_identical(expected, result) 2463}) 2464 2465test_that("dapplyCollect() on DataFrame with a binary column", { 2466 2467 df <- data.frame(key = 1:3) 2468 df$bytes <- lapply(df$key, serialize, connection = NULL) 2469 2470 df_spark <- createDataFrame(df) 2471 2472 result1 <- collect(df_spark) 2473 expect_identical(df, result1) 2474 2475 result2 <- dapplyCollect(df_spark, function(x) x) 2476 expect_identical(df, result2) 2477 2478 # A data.frame with a single column of bytes 2479 scb <- subset(df, select = "bytes") 2480 scb_spark <- createDataFrame(scb) 2481 result <- dapplyCollect(scb_spark, function(x) x) 2482 expect_identical(scb, result) 2483 2484}) 2485 2486test_that("repartition by columns on DataFrame", { 2487 df <- createDataFrame( 2488 list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)), 2489 c("a", "b", "c", "d")) 2490 2491 # no column and number of partitions specified 2492 retError <- tryCatch(repartition(df), error = function(e) e) 2493 expect_equal(grepl 2494 ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE) 2495 2496 # repartition by column and number of partitions 2497 actual <- repartition(df, 3, col = df$"a") 2498 2499 # Checking that at least the dimensions are identical 2500 expect_identical(dim(df), dim(actual)) 2501 expect_equal(getNumPartitions(actual), 3L) 2502 2503 # repartition by number of partitions 2504 actual <- repartition(df, 13L) 2505 expect_identical(dim(df), dim(actual)) 2506 expect_equal(getNumPartitions(actual), 13L) 2507 2508 expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L) 2509 2510 # a test case with a column and dapply 2511 schema <- structType(structField("a", "integer"), structField("avg", "double")) 2512 df <- repartition(df, col = df$"a") 2513 df1 <- dapply( 2514 df, 2515 function(x) { 2516 y <- (data.frame(x$a[1], mean(x$b))) 2517 }, 2518 schema) 2519 2520 # Number of partitions is equal to 2 2521 expect_equal(nrow(df1), 2) 2522}) 2523 2524test_that("coalesce, repartition, numPartitions", { 2525 df <- as.DataFrame(cars, numPartitions = 5) 2526 expect_equal(getNumPartitions(df), 5) 2527 expect_equal(getNumPartitions(coalesce(df, 3)), 3) 2528 expect_equal(getNumPartitions(coalesce(df, 6)), 5) 2529 2530 df1 <- coalesce(df, 3) 2531 expect_equal(getNumPartitions(df1), 3) 2532 expect_equal(getNumPartitions(coalesce(df1, 6)), 5) 2533 expect_equal(getNumPartitions(coalesce(df1, 4)), 4) 2534 expect_equal(getNumPartitions(coalesce(df1, 2)), 2) 2535 2536 df2 <- repartition(df1, 10) 2537 expect_equal(getNumPartitions(df2), 10) 2538 expect_equal(getNumPartitions(coalesce(df2, 13)), 5) 2539 expect_equal(getNumPartitions(coalesce(df2, 7)), 5) 2540 expect_equal(getNumPartitions(coalesce(df2, 3)), 3) 2541}) 2542 2543test_that("gapply() and gapplyCollect() on a DataFrame", { 2544 df <- createDataFrame ( 2545 list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), 2546 c("a", "b", "c", "d")) 2547 expected <- collect(df) 2548 df1 <- gapply(df, "a", function(key, x) { x }, schema(df)) 2549 actual <- collect(df1) 2550 expect_identical(actual, expected) 2551 2552 df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x }) 2553 expect_identical(df1Collect, expected) 2554 2555 # Computes the sum of second column by grouping on the first and third columns 2556 # and checks if the sum is larger than 2 2557 schema <- structType(structField("a", "integer"), structField("e", "boolean")) 2558 df2 <- gapply( 2559 df, 2560 c(df$"a", df$"c"), 2561 function(key, x) { 2562 y <- data.frame(key[1], sum(x$b) > 2) 2563 }, 2564 schema) 2565 actual <- collect(df2)$e 2566 expected <- c(TRUE, TRUE) 2567 expect_identical(actual, expected) 2568 2569 df2Collect <- gapplyCollect( 2570 df, 2571 c(df$"a", df$"c"), 2572 function(key, x) { 2573 y <- data.frame(key[1], sum(x$b) > 2) 2574 colnames(y) <- c("a", "e") 2575 y 2576 }) 2577 actual <- df2Collect$e 2578 expect_identical(actual, expected) 2579 2580 # Computes the arithmetic mean of the second column by grouping 2581 # on the first and third columns. Output the groupping value and the average. 2582 schema <- structType(structField("a", "integer"), structField("c", "string"), 2583 structField("avg", "double")) 2584 df3 <- gapply( 2585 df, 2586 c("a", "c"), 2587 function(key, x) { 2588 y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) 2589 }, 2590 schema) 2591 actual <- collect(df3) 2592 actual <- actual[order(actual$a), ] 2593 rownames(actual) <- NULL 2594 expected <- collect(select(df, "a", "b", "c")) 2595 expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean)) 2596 colnames(expected) <- c("a", "c", "avg") 2597 expected <- expected[order(expected$a), ] 2598 rownames(expected) <- NULL 2599 expect_identical(actual, expected) 2600 2601 df3Collect <- gapplyCollect( 2602 df, 2603 c("a", "c"), 2604 function(key, x) { 2605 y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) 2606 colnames(y) <- c("a", "c", "avg") 2607 y 2608 }) 2609 actual <- df3Collect[order(df3Collect$a), ] 2610 expect_identical(actual$avg, expected$avg) 2611 2612 irisDF <- suppressWarnings(createDataFrame (iris)) 2613 schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double")) 2614 # Groups by `Sepal_Length` and computes the average for `Sepal_Width` 2615 df4 <- gapply( 2616 cols = "Sepal_Length", 2617 irisDF, 2618 function(key, x) { 2619 y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE) 2620 }, 2621 schema) 2622 actual <- collect(df4) 2623 actual <- actual[order(actual$Sepal_Length), ] 2624 rownames(actual) <- NULL 2625 agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean), 2626 stringsAsFactors = FALSE) 2627 colnames(agg_local_df) <- c("Sepal_Length", "Avg") 2628 expected <- agg_local_df[order(agg_local_df$Sepal_Length), ] 2629 rownames(expected) <- NULL 2630 expect_identical(actual, expected) 2631}) 2632 2633test_that("Window functions on a DataFrame", { 2634 df <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")), 2635 schema = c("key", "value")) 2636 ws <- orderBy(windowPartitionBy("key"), "value") 2637 result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws))) 2638 names(result) <- c("key", "value") 2639 expected <- data.frame(key = c(1L, NA, 2L, NA), 2640 value = c("1", NA, "2", NA), 2641 stringsAsFactors = FALSE) 2642 expect_equal(result, expected) 2643 2644 ws <- orderBy(windowPartitionBy(df$key), df$value) 2645 result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws))) 2646 names(result) <- c("key", "value") 2647 expect_equal(result, expected) 2648 2649 ws <- partitionBy(windowOrderBy("value"), "key") 2650 result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws))) 2651 names(result) <- c("key", "value") 2652 expect_equal(result, expected) 2653 2654 ws <- partitionBy(windowOrderBy(df$value), df$key) 2655 result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws))) 2656 names(result) <- c("key", "value") 2657 expect_equal(result, expected) 2658}) 2659 2660test_that("createDataFrame sqlContext parameter backward compatibility", { 2661 sqlContext <- suppressWarnings(sparkRSQL.init(sc)) 2662 a <- 1:3 2663 b <- c("a", "b", "c") 2664 ldf <- data.frame(a, b) 2665 # Call function with namespace :: operator - SPARK-16538 2666 df <- suppressWarnings(SparkR::createDataFrame(sqlContext, ldf)) 2667 expect_equal(columns(df), c("a", "b")) 2668 expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) 2669 expect_equal(count(df), 3) 2670 ldf2 <- collect(df) 2671 expect_equal(ldf$a, ldf2$a) 2672 2673 df2 <- suppressWarnings(createDataFrame(sqlContext, iris)) 2674 expect_equal(count(df2), 150) 2675 expect_equal(ncol(df2), 5) 2676 2677 df3 <- suppressWarnings(read.df(sqlContext, jsonPath, "json")) 2678 expect_is(df3, "SparkDataFrame") 2679 expect_equal(count(df3), 3) 2680 2681 before <- suppressWarnings(createDataFrame(sqlContext, iris)) 2682 after <- suppressWarnings(createDataFrame(iris)) 2683 expect_equal(collect(before), collect(after)) 2684 2685 # more tests for SPARK-16538 2686 createOrReplaceTempView(df, "table") 2687 SparkR::tables() 2688 SparkR::sql("SELECT 1") 2689 suppressWarnings(SparkR::sql(sqlContext, "SELECT * FROM table")) 2690 suppressWarnings(SparkR::dropTempTable(sqlContext, "table")) 2691}) 2692 2693test_that("randomSplit", { 2694 num <- 4000 2695 df <- createDataFrame(data.frame(id = 1:num)) 2696 weights <- c(2, 3, 5) 2697 df_list <- randomSplit(df, weights) 2698 expect_equal(length(weights), length(df_list)) 2699 counts <- sapply(df_list, count) 2700 expect_equal(num, sum(counts)) 2701 expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 }))) 2702 2703 df_list <- randomSplit(df, weights, 0) 2704 expect_equal(length(weights), length(df_list)) 2705 counts <- sapply(df_list, count) 2706 expect_equal(num, sum(counts)) 2707 expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 }))) 2708}) 2709 2710test_that("Setting and getting config on SparkSession, sparkR.conf(), sparkR.uiWebUrl()", { 2711 # first, set it to a random but known value 2712 conf <- callJMethod(sparkSession, "conf") 2713 property <- paste0("spark.testing.", as.character(runif(1))) 2714 value1 <- as.character(runif(1)) 2715 callJMethod(conf, "set", property, value1) 2716 2717 # next, change the same property to the new value 2718 value2 <- as.character(runif(1)) 2719 l <- list(value2) 2720 names(l) <- property 2721 sparkR.session(sparkConfig = l) 2722 2723 newValue <- unlist(sparkR.conf(property, ""), use.names = FALSE) 2724 expect_equal(value2, newValue) 2725 2726 value <- as.character(runif(1)) 2727 sparkR.session(spark.app.name = "sparkSession test", spark.testing.r.session.r = value) 2728 allconf <- sparkR.conf() 2729 appNameValue <- allconf[["spark.app.name"]] 2730 testValue <- allconf[["spark.testing.r.session.r"]] 2731 expect_equal(appNameValue, "sparkSession test") 2732 expect_equal(testValue, value) 2733 expect_error(sparkR.conf("completely.dummy"), "Config 'completely.dummy' is not set") 2734 2735 url <- sparkR.uiWebUrl() 2736 expect_equal(substr(url, 1, 7), "http://") 2737}) 2738 2739test_that("enableHiveSupport on SparkSession", { 2740 setHiveContext(sc) 2741 unsetHiveContext() 2742 # if we are still here, it must be built with hive 2743 conf <- callJMethod(sparkSession, "conf") 2744 value <- callJMethod(conf, "get", "spark.sql.catalogImplementation") 2745 expect_equal(value, "hive") 2746}) 2747 2748test_that("Spark version from SparkSession", { 2749 ver <- callJMethod(sc, "version") 2750 version <- sparkR.version() 2751 expect_equal(ver, version) 2752}) 2753 2754test_that("Call DataFrameWriter.save() API in Java without path and check argument types", { 2755 df <- read.df(jsonPath, "json") 2756 # This tests if the exception is thrown from JVM not from SparkR side. 2757 # It makes sure that we can omit path argument in write.df API and then it calls 2758 # DataFrameWriter.save() without path. 2759 expect_error(write.df(df, source = "csv"), 2760 "Error in save : illegal argument - Expected exactly one path to be specified") 2761 expect_error(write.json(df, jsonPath), 2762 "Error in json : analysis error - path file:.*already exists") 2763 expect_error(write.text(df, jsonPath), 2764 "Error in text : analysis error - path file:.*already exists") 2765 expect_error(write.orc(df, jsonPath), 2766 "Error in orc : analysis error - path file:.*already exists") 2767 expect_error(write.parquet(df, jsonPath), 2768 "Error in parquet : analysis error - path file:.*already exists") 2769 2770 # Arguments checking in R side. 2771 expect_error(write.df(df, "data.tmp", source = c(1, 2)), 2772 paste("source should be character, NULL or omitted. It is the datasource specified", 2773 "in 'spark.sql.sources.default' configuration by default.")) 2774 expect_error(write.df(df, path = c(3)), 2775 "path should be charactor, NULL or omitted.") 2776 expect_error(write.df(df, mode = TRUE), 2777 "mode should be charactor or omitted. It is 'error' by default.") 2778}) 2779 2780test_that("Call DataFrameWriter.load() API in Java without path and check argument types", { 2781 # This tests if the exception is thrown from JVM not from SparkR side. 2782 # It makes sure that we can omit path argument in read.df API and then it calls 2783 # DataFrameWriter.load() without path. 2784 expect_error(read.df(source = "json"), 2785 paste("Error in loadDF : analysis error - Unable to infer schema for JSON.", 2786 "It must be specified manually")) 2787 expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not exist") 2788 expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist") 2789 expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist") 2790 expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist") 2791 expect_error(read.parquet("arbitrary_path"), 2792 "Error in parquet : analysis error - Path does not exist") 2793 2794 # Arguments checking in R side. 2795 expect_error(read.df(path = c(3)), 2796 "path should be charactor, NULL or omitted.") 2797 expect_error(read.df(jsonPath, source = c(1, 2)), 2798 paste("source should be character, NULL or omitted. It is the datasource specified", 2799 "in 'spark.sql.sources.default' configuration by default.")) 2800 2801 expect_warning(read.json(jsonPath, a = 1, 2, 3, "a"), 2802 "Unnamed arguments ignored: 2, 3, a.") 2803}) 2804 2805test_that("Collect on DataFrame when NAs exists at the top of a timestamp column", { 2806 ldf <- data.frame(col1 = c(0, 1, 2), 2807 col2 = c(as.POSIXct("2017-01-01 00:00:01"), 2808 NA, 2809 as.POSIXct("2017-01-01 12:00:01")), 2810 col3 = c(as.POSIXlt("2016-01-01 00:59:59"), 2811 NA, 2812 as.POSIXlt("2016-01-01 12:01:01"))) 2813 sdf1 <- createDataFrame(ldf) 2814 ldf1 <- collect(sdf1) 2815 expect_equal(dtypes(sdf1), list(c("col1", "double"), 2816 c("col2", "timestamp"), 2817 c("col3", "timestamp"))) 2818 expect_equal(class(ldf1$col1), "numeric") 2819 expect_equal(class(ldf1$col2), c("POSIXct", "POSIXt")) 2820 expect_equal(class(ldf1$col3), c("POSIXct", "POSIXt")) 2821 2822 # Columns with NAs at the top 2823 sdf2 <- filter(sdf1, "col1 > 1") 2824 ldf2 <- collect(sdf2) 2825 expect_equal(dtypes(sdf2), list(c("col1", "double"), 2826 c("col2", "timestamp"), 2827 c("col3", "timestamp"))) 2828 expect_equal(class(ldf2$col1), "numeric") 2829 expect_equal(class(ldf2$col2), c("POSIXct", "POSIXt")) 2830 expect_equal(class(ldf2$col3), c("POSIXct", "POSIXt")) 2831 2832 # Columns with only NAs, the type will also be cast to PRIMITIVE_TYPE 2833 sdf3 <- filter(sdf1, "col1 == 0") 2834 ldf3 <- collect(sdf3) 2835 expect_equal(dtypes(sdf3), list(c("col1", "double"), 2836 c("col2", "timestamp"), 2837 c("col3", "timestamp"))) 2838 expect_equal(class(ldf3$col1), "numeric") 2839 expect_equal(class(ldf3$col2), c("POSIXct", "POSIXt")) 2840 expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt")) 2841}) 2842 2843compare_list <- function(list1, list2) { 2844 # get testthat to show the diff by first making the 2 lists equal in length 2845 expect_equal(length(list1), length(list2)) 2846 l <- max(length(list1), length(list2)) 2847 length(list1) <- l 2848 length(list2) <- l 2849 expect_equal(sort(list1, na.last = TRUE), sort(list2, na.last = TRUE)) 2850} 2851 2852# This should always be the **very last test** in this test file. 2853test_that("No extra files are created in SPARK_HOME by starting session and making calls", { 2854 skip_on_cran() 2855 2856 # Check that it is not creating any extra file. 2857 # Does not check the tempdir which would be cleaned up after. 2858 filesAfter <- list.files(path = sparkRDir, all.files = TRUE) 2859 2860 expect_true(length(sparkRFilesBefore) > 0) 2861 # first, ensure derby.log is not there 2862 expect_false("derby.log" %in% filesAfter) 2863 # second, ensure only spark-warehouse is created when calling SparkSession, enableHiveSupport = F 2864 # note: currently all other test files have enableHiveSupport = F, so we capture the list of files 2865 # before creating a SparkSession with enableHiveSupport = T at the top of this test file 2866 # (filesBefore). The test here is to compare that (filesBefore) against the list of files before 2867 # any test is run in run-all.R (sparkRFilesBefore). 2868 # sparkRWhitelistSQLDirs is also defined in run-all.R, and should contain only 2 whitelisted dirs, 2869 # here allow the first value, spark-warehouse, in the diff, everything else should be exactly the 2870 # same as before any test is run. 2871 compare_list(sparkRFilesBefore, setdiff(filesBefore, sparkRWhitelistSQLDirs[[1]])) 2872 # third, ensure only spark-warehouse and metastore_db are created when enableHiveSupport = T 2873 # note: as the note above, after running all tests in this file while enableHiveSupport = T, we 2874 # check the list of files again. This time we allow both whitelisted dirs to be in the diff. 2875 compare_list(sparkRFilesBefore, setdiff(filesAfter, sparkRWhitelistSQLDirs)) 2876}) 2877 2878unlink(parquetPath) 2879unlink(orcPath) 2880unlink(jsonPath) 2881unlink(jsonPathNa) 2882 2883sparkR.session.stop() 2884