1#
2# Licensed to the Apache Software Foundation (ASF) under one or more
3# contributor license agreements.  See the NOTICE file distributed with
4# this work for additional information regarding copyright ownership.
5# The ASF licenses this file to You under the Apache License, Version 2.0
6# (the "License"); you may not use this file except in compliance with
7# the License.  You may obtain a copy of the License at
8#
9#    http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18library(testthat)
19
20context("SparkSQL functions")
21
22# Utility function for easily checking the values of a StructField
23checkStructField <- function(actual, expectedName, expectedType, expectedNullable) {
24  expect_equal(class(actual), "structField")
25  expect_equal(actual$name(), expectedName)
26  expect_equal(actual$dataType.toString(), expectedType)
27  expect_equal(actual$nullable(), expectedNullable)
28}
29
30markUtf8 <- function(s) {
31  Encoding(s) <- "UTF-8"
32  s
33}
34
35setHiveContext <- function(sc) {
36  if (exists(".testHiveSession", envir = .sparkREnv)) {
37    hiveSession <- get(".testHiveSession", envir = .sparkREnv)
38  } else {
39    # initialize once and reuse
40    ssc <- callJMethod(sc, "sc")
41    hiveCtx <- tryCatch({
42      newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE)
43    },
44    error = function(err) {
45      skip("Hive is not build with SparkSQL, skipped")
46    })
47    hiveSession <- callJMethod(hiveCtx, "sparkSession")
48  }
49  previousSession <- get(".sparkRsession", envir = .sparkREnv)
50  assign(".sparkRsession", hiveSession, envir = .sparkREnv)
51  assign(".prevSparkRsession", previousSession, envir = .sparkREnv)
52  hiveSession
53}
54
55unsetHiveContext <- function() {
56  previousSession <- get(".prevSparkRsession", envir = .sparkREnv)
57  assign(".sparkRsession", previousSession, envir = .sparkREnv)
58  remove(".prevSparkRsession", envir = .sparkREnv)
59}
60
61# Tests for SparkSQL functions in SparkR
62
63filesBefore <- list.files(path = sparkRDir, all.files = TRUE)
64sparkSession <- sparkR.session()
65sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
66
67mockLines <- c("{\"name\":\"Michael\"}",
68               "{\"name\":\"Andy\", \"age\":30}",
69               "{\"name\":\"Justin\", \"age\":19}")
70jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
71parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
72orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc")
73writeLines(mockLines, jsonPath)
74
75# For test nafunctions, like dropna(), fillna(),...
76mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
77                 "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
78                 "{\"name\":\"David\",\"age\":60,\"height\":null}",
79                 "{\"name\":\"Amy\",\"age\":null,\"height\":null}",
80                 "{\"name\":null,\"age\":null,\"height\":null}")
81jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
82writeLines(mockLinesNa, jsonPathNa)
83
84# For test complex types in DataFrame
85mockLinesComplexType <-
86  c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}",
87    "{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}",
88    "{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}")
89complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
90writeLines(mockLinesComplexType, complexTypeJsonPath)
91
92test_that("calling sparkRSQL.init returns existing SQL context", {
93  sqlContext <- suppressWarnings(sparkRSQL.init(sc))
94  expect_equal(suppressWarnings(sparkRSQL.init(sc)), sqlContext)
95})
96
97test_that("calling sparkRSQL.init returns existing SparkSession", {
98  expect_equal(suppressWarnings(sparkRSQL.init(sc)), sparkSession)
99})
100
101test_that("calling sparkR.session returns existing SparkSession", {
102  expect_equal(sparkR.session(), sparkSession)
103})
104
105test_that("infer types and check types", {
106  expect_equal(infer_type(1L), "integer")
107  expect_equal(infer_type(1.0), "double")
108  expect_equal(infer_type("abc"), "string")
109  expect_equal(infer_type(TRUE), "boolean")
110  expect_equal(infer_type(as.Date("2015-03-11")), "date")
111  expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp")
112  expect_equal(infer_type(c(1L, 2L)), "array<integer>")
113  expect_equal(infer_type(list(1L, 2L)), "array<integer>")
114  expect_equal(infer_type(listToStruct(list(a = 1L, b = "2"))), "struct<a:integer,b:string>")
115  e <- new.env()
116  assign("a", 1L, envir = e)
117  expect_equal(infer_type(e), "map<string,integer>")
118
119  expect_error(checkType("map<integer,integer>"), "Key type in a map must be string or character")
120
121  expect_equal(infer_type(as.raw(c(1, 2, 3))), "binary")
122})
123
124test_that("structType and structField", {
125  testField <- structField("a", "string")
126  expect_is(testField, "structField")
127  expect_equal(testField$name(), "a")
128  expect_true(testField$nullable())
129
130  testSchema <- structType(testField, structField("b", "integer"))
131  expect_is(testSchema, "structType")
132  expect_is(testSchema$fields()[[2]], "structField")
133  expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType")
134})
135
136test_that("create DataFrame from RDD", {
137  rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
138  df <- createDataFrame(rdd, list("a", "b"))
139  dfAsDF <- as.DataFrame(rdd, list("a", "b"))
140  expect_is(df, "SparkDataFrame")
141  expect_is(dfAsDF, "SparkDataFrame")
142  expect_equal(count(df), 10)
143  expect_equal(count(dfAsDF), 10)
144  expect_equal(nrow(df), 10)
145  expect_equal(nrow(dfAsDF), 10)
146  expect_equal(ncol(df), 2)
147  expect_equal(ncol(dfAsDF), 2)
148  expect_equal(dim(df), c(10, 2))
149  expect_equal(dim(dfAsDF), c(10, 2))
150  expect_equal(columns(df), c("a", "b"))
151  expect_equal(columns(dfAsDF), c("a", "b"))
152  expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
153  expect_equal(dtypes(dfAsDF), list(c("a", "int"), c("b", "string")))
154
155  df <- createDataFrame(rdd)
156  dfAsDF <- as.DataFrame(rdd)
157  expect_is(df, "SparkDataFrame")
158  expect_is(dfAsDF, "SparkDataFrame")
159  expect_equal(columns(df), c("_1", "_2"))
160  expect_equal(columns(dfAsDF), c("_1", "_2"))
161
162  schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
163                        structField(x = "b", type = "string", nullable = TRUE))
164  df <- createDataFrame(rdd, schema)
165  expect_is(df, "SparkDataFrame")
166  expect_equal(columns(df), c("a", "b"))
167  expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
168
169  rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) })
170  df <- createDataFrame(rdd)
171  expect_is(df, "SparkDataFrame")
172  expect_equal(count(df), 10)
173  expect_equal(columns(df), c("a", "b"))
174  expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
175
176  schema <- structType(structField("name", "string"), structField("age", "integer"),
177                       structField("height", "float"))
178  df <- read.df(jsonPathNa, "json", schema)
179  df2 <- createDataFrame(toRDD(df), schema)
180  df2AsDF <- as.DataFrame(toRDD(df), schema)
181  expect_equal(columns(df2), c("name", "age", "height"))
182  expect_equal(columns(df2AsDF), c("name", "age", "height"))
183  expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
184  expect_equal(dtypes(df2AsDF), list(c("name", "string"), c("age", "int"), c("height", "float")))
185  expect_equal(as.list(collect(where(df2, df2$name == "Bob"))),
186               list(name = "Bob", age = 16, height = 176.5))
187  expect_equal(as.list(collect(where(df2AsDF, df2AsDF$name == "Bob"))),
188               list(name = "Bob", age = 16, height = 176.5))
189
190  localDF <- data.frame(name = c("John", "Smith", "Sarah"),
191                        age = c(19L, 23L, 18L),
192                        height = c(176.5, 181.4, 173.7))
193  df <- createDataFrame(localDF, schema)
194  expect_is(df, "SparkDataFrame")
195  expect_equal(count(df), 3)
196  expect_equal(columns(df), c("name", "age", "height"))
197  expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
198  expect_equal(as.list(collect(where(df, df$name == "John"))),
199               list(name = "John", age = 19L, height = 176.5))
200  expect_equal(getNumPartitions(df), 1)
201
202  df <- as.DataFrame(cars, numPartitions = 2)
203  expect_equal(getNumPartitions(df), 2)
204  df <- createDataFrame(cars, numPartitions = 3)
205  expect_equal(getNumPartitions(df), 3)
206  # validate limit by num of rows
207  df <- createDataFrame(cars, numPartitions = 60)
208  expect_equal(getNumPartitions(df), 50)
209  # validate when 1 < (length(coll) / numSlices) << length(coll)
210  df <- createDataFrame(cars, numPartitions = 20)
211  expect_equal(getNumPartitions(df), 20)
212
213  df <- as.DataFrame(data.frame(0))
214  expect_is(df, "SparkDataFrame")
215  df <- createDataFrame(list(list(1)))
216  expect_is(df, "SparkDataFrame")
217  df <- as.DataFrame(data.frame(0), numPartitions = 2)
218  # no data to partition, goes to 1
219  expect_equal(getNumPartitions(df), 1)
220
221  setHiveContext(sc)
222  sql("CREATE TABLE people (name string, age double, height float)")
223  df <- read.df(jsonPathNa, "json", schema)
224  invisible(insertInto(df, "people"))
225  expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age,
226               c(16))
227  expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height,
228               c(176.5))
229  sql("DROP TABLE people")
230  unsetHiveContext()
231})
232
233test_that("createDataFrame uses files for large objects", {
234  # To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value
235  conf <- callJMethod(sparkSession, "conf")
236  callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
237  df <- suppressWarnings(createDataFrame(iris, numPartitions = 3))
238  expect_equal(getNumPartitions(df), 3)
239
240  # Resetting the conf back to default value
241  callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))
242  expect_equal(dim(df), dim(iris))
243})
244
245test_that("read/write csv as DataFrame", {
246  csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
247  mockLinesCsv <- c("year,make,model,comment,blank",
248                   "\"2012\",\"Tesla\",\"S\",\"No comment\",",
249                   "1997,Ford,E350,\"Go get one now they are going fast\",",
250                   "2015,Chevy,Volt",
251                   "NA,Dummy,Placeholder")
252  writeLines(mockLinesCsv, csvPath)
253
254  # default "header" is false, inferSchema to handle "year" as "int"
255  df <- read.df(csvPath, "csv", header = "true", inferSchema = "true")
256  expect_equal(count(df), 4)
257  expect_equal(columns(df), c("year", "make", "model", "comment", "blank"))
258  expect_equal(sort(unlist(collect(where(df, df$year == 2015)))),
259               sort(unlist(list(year = 2015, make = "Chevy", model = "Volt"))))
260
261  # since "year" is "int", let's skip the NA values
262  withoutna <- na.omit(df, how = "any", cols = "year")
263  expect_equal(count(withoutna), 3)
264
265  unlink(csvPath)
266  csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
267  mockLinesCsv <- c("year,make,model,comment,blank",
268                   "\"2012\",\"Tesla\",\"S\",\"No comment\",",
269                   "1997,Ford,E350,\"Go get one now they are going fast\",",
270                   "2015,Chevy,Volt",
271                   "Empty,Dummy,Placeholder")
272  writeLines(mockLinesCsv, csvPath)
273
274  df2 <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "Empty")
275  expect_equal(count(df2), 4)
276  withoutna2 <- na.omit(df2, how = "any", cols = "year")
277  expect_equal(count(withoutna2), 3)
278  expect_equal(count(where(withoutna2, withoutna2$make == "Dummy")), 0)
279
280  # writing csv file
281  csvPath2 <- tempfile(pattern = "csvtest2", fileext = ".csv")
282  write.df(df2, path = csvPath2, "csv", header = "true")
283  df3 <- read.df(csvPath2, "csv", header = "true")
284  expect_equal(nrow(df3), nrow(df2))
285  expect_equal(colnames(df3), colnames(df2))
286  csv <- read.csv(file = list.files(csvPath2, pattern = "^part", full.names = T)[[1]])
287  expect_equal(colnames(df3), colnames(csv))
288
289  unlink(csvPath)
290  unlink(csvPath2)
291})
292
293test_that("Support other types for options", {
294  csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
295  mockLinesCsv <- c("year,make,model,comment,blank",
296  "\"2012\",\"Tesla\",\"S\",\"No comment\",",
297  "1997,Ford,E350,\"Go get one now they are going fast\",",
298  "2015,Chevy,Volt",
299  "NA,Dummy,Placeholder")
300  writeLines(mockLinesCsv, csvPath)
301
302  csvDf <- read.df(csvPath, "csv", header = "true", inferSchema = "true")
303  expected <- read.df(csvPath, "csv", header = TRUE, inferSchema = TRUE)
304  expect_equal(collect(csvDf), collect(expected))
305
306  expect_error(read.df(csvPath, "csv", header = TRUE, maxColumns = 3))
307  unlink(csvPath)
308})
309
310test_that("convert NAs to null type in DataFrames", {
311  rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
312  df <- createDataFrame(rdd, list("a", "b"))
313  expect_true(is.na(collect(df)[2, "a"]))
314  expect_equal(collect(df)[2, "b"], 4L)
315
316  l <- data.frame(x = 1L, y = c(1L, NA_integer_, 3L))
317  df <- createDataFrame(l)
318  expect_equal(collect(df)[2, "x"], 1L)
319  expect_true(is.na(collect(df)[2, "y"]))
320
321  rdd <- parallelize(sc, list(list(1, 2), list(NA, 4)))
322  df <- createDataFrame(rdd, list("a", "b"))
323  expect_true(is.na(collect(df)[2, "a"]))
324  expect_equal(collect(df)[2, "b"], 4)
325
326  l <- data.frame(x = 1, y = c(1, NA_real_, 3))
327  df <- createDataFrame(l)
328  expect_equal(collect(df)[2, "x"], 1)
329  expect_true(is.na(collect(df)[2, "y"]))
330
331  l <- list("a", "b", NA, "d")
332  df <- createDataFrame(l)
333  expect_true(is.na(collect(df)[3, "_1"]))
334  expect_equal(collect(df)[4, "_1"], "d")
335
336  l <- list("a", "b", NA_character_, "d")
337  df <- createDataFrame(l)
338  expect_true(is.na(collect(df)[3, "_1"]))
339  expect_equal(collect(df)[4, "_1"], "d")
340
341  l <- list(TRUE, FALSE, NA, TRUE)
342  df <- createDataFrame(l)
343  expect_true(is.na(collect(df)[3, "_1"]))
344  expect_equal(collect(df)[4, "_1"], TRUE)
345})
346
347test_that("toDF", {
348  rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
349  df <- toDF(rdd, list("a", "b"))
350  expect_is(df, "SparkDataFrame")
351  expect_equal(count(df), 10)
352  expect_equal(columns(df), c("a", "b"))
353  expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
354
355  df <- toDF(rdd)
356  expect_is(df, "SparkDataFrame")
357  expect_equal(columns(df), c("_1", "_2"))
358
359  schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
360                        structField(x = "b", type = "string", nullable = TRUE))
361  df <- toDF(rdd, schema)
362  expect_is(df, "SparkDataFrame")
363  expect_equal(columns(df), c("a", "b"))
364  expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
365
366  rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) })
367  df <- toDF(rdd)
368  expect_is(df, "SparkDataFrame")
369  expect_equal(count(df), 10)
370  expect_equal(columns(df), c("a", "b"))
371  expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
372})
373
374test_that("create DataFrame from list or data.frame", {
375  l <- list(list(1, 2), list(3, 4))
376  df <- createDataFrame(l, c("a", "b"))
377  expect_equal(columns(df), c("a", "b"))
378
379  l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
380  df <- createDataFrame(l)
381  expect_equal(columns(df), c("a", "b"))
382
383  a <- 1:3
384  b <- c("a", "b", "c")
385  ldf <- data.frame(a, b)
386  df <- createDataFrame(ldf)
387  expect_equal(columns(df), c("a", "b"))
388  expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
389  expect_equal(count(df), 3)
390  ldf2 <- collect(df)
391  expect_equal(ldf$a, ldf2$a)
392
393  irisdf <- suppressWarnings(createDataFrame(iris))
394  iris_collected <- collect(irisdf)
395  expect_equivalent(iris_collected[, -5], iris[, -5])
396  expect_equal(iris_collected$Species, as.character(iris$Species))
397
398  mtcarsdf <- createDataFrame(mtcars)
399  expect_equivalent(collect(mtcarsdf), mtcars)
400
401  bytes <- as.raw(c(1, 2, 3))
402  df <- createDataFrame(list(list(bytes)))
403  expect_equal(collect(df)[[1]][[1]], bytes)
404})
405
406test_that("create DataFrame with different data types", {
407  l <- list(a = 1L, b = 2, c = TRUE, d = "ss", e = as.Date("2012-12-13"),
408            f = as.POSIXct("2015-03-15 12:13:14.056"))
409  df <- createDataFrame(list(l))
410  expect_equal(dtypes(df), list(c("a", "int"), c("b", "double"), c("c", "boolean"),
411                                c("d", "string"), c("e", "date"), c("f", "timestamp")))
412  expect_equal(count(df), 1)
413  expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
414})
415
416test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
417  df <- data.frame(
418    id = 1:2,
419    time = c(as.POSIXlt("2016-01-10"), NA),
420    date = c(as.Date("2016-10-01"), NA))
421
422  DF <- collect(createDataFrame(df))
423  expect_true(is.na(DF$date[2]))
424  expect_equal(DF$date[1], as.Date("2016-10-01"))
425  expect_true(is.na(DF$time[2]))
426  expect_equal(DF$time[1], as.POSIXlt("2016-01-10"))
427})
428
429test_that("create DataFrame with complex types", {
430  e <- new.env()
431  assign("n", 3L, envir = e)
432
433  s <- listToStruct(list(a = "aa", b = 3L))
434
435  l <- list(as.list(1:10), list("a", "b"), e, s)
436  df <- createDataFrame(list(l), c("a", "b", "c", "d"))
437  expect_equal(dtypes(df), list(c("a", "array<int>"),
438                                c("b", "array<string>"),
439                                c("c", "map<string,int>"),
440                                c("d", "struct<a:string,b:int>")))
441  expect_equal(count(df), 1)
442  ldf <- collect(df)
443  expect_equal(names(ldf), c("a", "b", "c", "d"))
444  expect_equal(ldf[1, 1][[1]], l[[1]])
445  expect_equal(ldf[1, 2][[1]], l[[2]])
446
447  e <- ldf$c[[1]]
448  expect_equal(class(e), "environment")
449  expect_equal(ls(e), "n")
450  expect_equal(e$n, 3L)
451
452  s <- ldf$d[[1]]
453  expect_equal(class(s), "struct")
454  expect_equal(s$a, "aa")
455  expect_equal(s$b, 3L)
456})
457
458test_that("create DataFrame from a data.frame with complex types", {
459  ldf <- data.frame(row.names = 1:2)
460  ldf$a_list <- list(list(1, 2), list(3, 4))
461  ldf$an_envir <- c(as.environment(list(a = 1, b = 2)), as.environment(list(c = 3)))
462
463  sdf <- createDataFrame(ldf)
464  collected <- collect(sdf)
465
466  expect_identical(ldf[, 1, FALSE], collected[, 1, FALSE])
467  expect_equal(ldf$an_envir, collected$an_envir)
468})
469
470# For test map type and struct type in DataFrame
471mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
472                      "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
473                      "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
474mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
475writeLines(mockLinesMapType, mapTypeJsonPath)
476
477test_that("Collect DataFrame with complex types", {
478  # ArrayType
479  df <- read.json(complexTypeJsonPath)
480  ldf <- collect(df)
481  expect_equal(nrow(ldf), 3)
482  expect_equal(ncol(ldf), 3)
483  expect_equal(names(ldf), c("c1", "c2", "c3"))
484  expect_equal(ldf$c1, list(list(1, 2, 3), list(4, 5, 6), list (7, 8, 9)))
485  expect_equal(ldf$c2, list(list("a", "b", "c"), list("d", "e", "f"), list ("g", "h", "i")))
486  expect_equal(ldf$c3, list(list(1.0, 2.0, 3.0), list(4.0, 5.0, 6.0), list (7.0, 8.0, 9.0)))
487
488  # MapType
489  schema <- structType(structField("name", "string"),
490                       structField("info", "map<string,double>"))
491  df <- read.df(mapTypeJsonPath, "json", schema)
492  expect_equal(dtypes(df), list(c("name", "string"),
493                                c("info", "map<string,double>")))
494  ldf <- collect(df)
495  expect_equal(nrow(ldf), 3)
496  expect_equal(ncol(ldf), 2)
497  expect_equal(names(ldf), c("name", "info"))
498  expect_equal(ldf$name, c("Bob", "Alice", "David"))
499  bob <- ldf$info[[1]]
500  expect_equal(class(bob), "environment")
501  expect_equal(bob$age, 16)
502  expect_equal(bob$height, 176.5)
503
504  # StructType
505  df <- read.json(mapTypeJsonPath)
506  expect_equal(dtypes(df), list(c("info", "struct<age:bigint,height:double>"),
507                                c("name", "string")))
508  ldf <- collect(df)
509  expect_equal(nrow(ldf), 3)
510  expect_equal(ncol(ldf), 2)
511  expect_equal(names(ldf), c("info", "name"))
512  expect_equal(ldf$name, c("Bob", "Alice", "David"))
513  bob <- ldf$info[[1]]
514  expect_equal(class(bob), "struct")
515  expect_equal(bob$age, 16)
516  expect_equal(bob$height, 176.5)
517})
518
519test_that("read/write json files", {
520  # Test read.df
521  df <- read.df(jsonPath, "json")
522  expect_is(df, "SparkDataFrame")
523  expect_equal(count(df), 3)
524
525  # Test read.df with a user defined schema
526  schema <- structType(structField("name", type = "string"),
527                       structField("age", type = "double"))
528
529  df1 <- read.df(jsonPath, "json", schema)
530  expect_is(df1, "SparkDataFrame")
531  expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
532
533  # Test loadDF
534  df2 <- loadDF(jsonPath, "json", schema)
535  expect_is(df2, "SparkDataFrame")
536  expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
537
538  # Test read.json
539  df <- read.json(jsonPath)
540  expect_is(df, "SparkDataFrame")
541  expect_equal(count(df), 3)
542
543  # Test write.df
544  jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".json")
545  write.df(df, jsonPath2, "json", mode = "overwrite")
546
547  # Test write.json
548  jsonPath3 <- tempfile(pattern = "jsonPath3", fileext = ".json")
549  write.json(df, jsonPath3)
550
551  # Test read.json()/jsonFile() works with multiple input paths
552  jsonDF1 <- read.json(c(jsonPath2, jsonPath3))
553  expect_is(jsonDF1, "SparkDataFrame")
554  expect_equal(count(jsonDF1), 6)
555  # Suppress warnings because jsonFile is deprecated
556  jsonDF2 <- suppressWarnings(jsonFile(c(jsonPath2, jsonPath3)))
557  expect_is(jsonDF2, "SparkDataFrame")
558  expect_equal(count(jsonDF2), 6)
559
560  unlink(jsonPath2)
561  unlink(jsonPath3)
562})
563
564test_that("read/write json files - compression option", {
565  df <- read.df(jsonPath, "json")
566
567  jsonPath <- tempfile(pattern = "jsonPath", fileext = ".json")
568  write.json(df, jsonPath, compression = "gzip")
569  jsonDF <- read.json(jsonPath)
570  expect_is(jsonDF, "SparkDataFrame")
571  expect_equal(count(jsonDF), count(df))
572  expect_true(length(list.files(jsonPath, pattern = ".gz")) > 0)
573
574  unlink(jsonPath)
575})
576
577test_that("jsonRDD() on a RDD with json string", {
578  sqlContext <- suppressWarnings(sparkRSQL.init(sc))
579  rdd <- parallelize(sc, mockLines)
580  expect_equal(countRDD(rdd), 3)
581  df <- suppressWarnings(jsonRDD(sqlContext, rdd))
582  expect_is(df, "SparkDataFrame")
583  expect_equal(count(df), 3)
584
585  rdd2 <- flatMap(rdd, function(x) c(x, x))
586  df <- suppressWarnings(jsonRDD(sqlContext, rdd2))
587  expect_is(df, "SparkDataFrame")
588  expect_equal(count(df), 6)
589})
590
591test_that("test tableNames and tables", {
592  df <- read.json(jsonPath)
593  createOrReplaceTempView(df, "table1")
594  expect_equal(length(tableNames()), 1)
595  tables <- tables()
596  expect_equal(count(tables), 1)
597
598  suppressWarnings(registerTempTable(df, "table2"))
599  tables <- tables()
600  expect_equal(count(tables), 2)
601  suppressWarnings(dropTempTable("table1"))
602  expect_true(dropTempView("table2"))
603
604  tables <- tables()
605  expect_equal(count(tables), 0)
606})
607
608test_that(
609  "createOrReplaceTempView() results in a queryable table and sql() results in a new DataFrame", {
610  df <- read.json(jsonPath)
611  createOrReplaceTempView(df, "table1")
612  newdf <- sql("SELECT * FROM table1 where name = 'Michael'")
613  expect_is(newdf, "SparkDataFrame")
614  expect_equal(count(newdf), 1)
615  expect_true(dropTempView("table1"))
616
617  createOrReplaceTempView(df, "dfView")
618  sqlCast <- collect(sql("select cast('2' as decimal) as x from dfView limit 1"))
619  out <- capture.output(sqlCast)
620  expect_true(is.data.frame(sqlCast))
621  expect_equal(names(sqlCast)[1], "x")
622  expect_equal(nrow(sqlCast), 1)
623  expect_equal(ncol(sqlCast), 1)
624  expect_equal(out[1], "  x")
625  expect_equal(out[2], "1 2")
626  expect_true(dropTempView("dfView"))
627})
628
629test_that("test cache, uncache and clearCache", {
630  df <- read.json(jsonPath)
631  createOrReplaceTempView(df, "table1")
632  cacheTable("table1")
633  uncacheTable("table1")
634  clearCache()
635  expect_true(dropTempView("table1"))
636})
637
638test_that("insertInto() on a registered table", {
639  df <- read.df(jsonPath, "json")
640  write.df(df, parquetPath, "parquet", "overwrite")
641  dfParquet <- read.df(parquetPath, "parquet")
642
643  lines <- c("{\"name\":\"Bob\", \"age\":24}",
644             "{\"name\":\"James\", \"age\":35}")
645  jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".tmp")
646  parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
647  writeLines(lines, jsonPath2)
648  df2 <- read.df(jsonPath2, "json")
649  write.df(df2, parquetPath2, "parquet", "overwrite")
650  dfParquet2 <- read.df(parquetPath2, "parquet")
651
652  createOrReplaceTempView(dfParquet, "table1")
653  insertInto(dfParquet2, "table1")
654  expect_equal(count(sql("select * from table1")), 5)
655  expect_equal(first(sql("select * from table1 order by age"))$name, "Michael")
656  expect_true(dropTempView("table1"))
657
658  createOrReplaceTempView(dfParquet, "table1")
659  insertInto(dfParquet2, "table1", overwrite = TRUE)
660  expect_equal(count(sql("select * from table1")), 2)
661  expect_equal(first(sql("select * from table1 order by age"))$name, "Bob")
662  expect_true(dropTempView("table1"))
663
664  unlink(jsonPath2)
665  unlink(parquetPath2)
666})
667
668test_that("tableToDF() returns a new DataFrame", {
669  df <- read.json(jsonPath)
670  createOrReplaceTempView(df, "table1")
671  tabledf <- tableToDF("table1")
672  expect_is(tabledf, "SparkDataFrame")
673  expect_equal(count(tabledf), 3)
674  tabledf2 <- tableToDF("table1")
675  expect_equal(count(tabledf2), 3)
676  expect_true(dropTempView("table1"))
677})
678
679test_that("toRDD() returns an RRDD", {
680  df <- read.json(jsonPath)
681  testRDD <- toRDD(df)
682  expect_is(testRDD, "RDD")
683  expect_equal(countRDD(testRDD), 3)
684})
685
686test_that("union on two RDDs created from DataFrames returns an RRDD", {
687  df <- read.json(jsonPath)
688  RDD1 <- toRDD(df)
689  RDD2 <- toRDD(df)
690  unioned <- unionRDD(RDD1, RDD2)
691  expect_is(unioned, "RDD")
692  expect_equal(getSerializedMode(unioned), "byte")
693  expect_equal(collectRDD(unioned)[[2]]$name, "Andy")
694})
695
696test_that("union on mixed serialization types correctly returns a byte RRDD", {
697  # Byte RDD
698  nums <- 1:10
699  rdd <- parallelize(sc, nums, 2L)
700
701  # String RDD
702  textLines <- c("Michael",
703                 "Andy, 30",
704                 "Justin, 19")
705  textPath <- tempfile(pattern = "sparkr-textLines", fileext = ".tmp")
706  writeLines(textLines, textPath)
707  textRDD <- textFile(sc, textPath)
708
709  df <- read.json(jsonPath)
710  dfRDD <- toRDD(df)
711
712  unionByte <- unionRDD(rdd, dfRDD)
713  expect_is(unionByte, "RDD")
714  expect_equal(getSerializedMode(unionByte), "byte")
715  expect_equal(collectRDD(unionByte)[[1]], 1)
716  expect_equal(collectRDD(unionByte)[[12]]$name, "Andy")
717
718  unionString <- unionRDD(textRDD, dfRDD)
719  expect_is(unionString, "RDD")
720  expect_equal(getSerializedMode(unionString), "byte")
721  expect_equal(collectRDD(unionString)[[1]], "Michael")
722  expect_equal(collectRDD(unionString)[[5]]$name, "Andy")
723})
724
725test_that("objectFile() works with row serialization", {
726  objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp")
727  df <- read.json(jsonPath)
728  dfRDD <- toRDD(df)
729  saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath)
730  objectIn <- objectFile(sc, objectPath)
731
732  expect_is(objectIn, "RDD")
733  expect_equal(getSerializedMode(objectIn), "byte")
734  expect_equal(collectRDD(objectIn)[[2]]$age, 30)
735})
736
737test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
738  df <- read.json(jsonPath)
739  testRDD <- lapply(df, function(row) {
740    row$newCol <- row$age + 5
741    row
742    })
743  expect_is(testRDD, "RDD")
744  collected <- collectRDD(testRDD)
745  expect_equal(collected[[1]]$name, "Michael")
746  expect_equal(collected[[2]]$newCol, 35)
747})
748
749test_that("collect() returns a data.frame", {
750  df <- read.json(jsonPath)
751  rdf <- collect(df)
752  expect_true(is.data.frame(rdf))
753  expect_equal(names(rdf)[1], "age")
754  expect_equal(nrow(rdf), 3)
755  expect_equal(ncol(rdf), 2)
756
757  # collect() returns data correctly from a DataFrame with 0 row
758  df0 <- limit(df, 0)
759  rdf <- collect(df0)
760  expect_true(is.data.frame(rdf))
761  expect_equal(names(rdf)[1], "age")
762  expect_equal(nrow(rdf), 0)
763  expect_equal(ncol(rdf), 2)
764
765  # collect() correctly handles multiple columns with same name
766  df <- createDataFrame(list(list(1, 2)), schema = c("name", "name"))
767  ldf <- collect(df)
768  expect_equal(names(ldf), c("name", "name"))
769})
770
771test_that("limit() returns DataFrame with the correct number of rows", {
772  df <- read.json(jsonPath)
773  dfLimited <- limit(df, 2)
774  expect_is(dfLimited, "SparkDataFrame")
775  expect_equal(count(dfLimited), 2)
776})
777
778test_that("collect() and take() on a DataFrame return the same number of rows and columns", {
779  df <- read.json(jsonPath)
780  expect_equal(nrow(collect(df)), nrow(take(df, 10)))
781  expect_equal(ncol(collect(df)), ncol(take(df, 10)))
782})
783
784test_that("collect() support Unicode characters", {
785  lines <- c("{\"name\":\"안녕하세요\"}",
786             "{\"name\":\"您好\", \"age\":30}",
787             "{\"name\":\"こんにちは\", \"age\":19}",
788             "{\"name\":\"Xin chào\"}")
789
790  jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
791  writeLines(lines, jsonPath)
792
793  df <- read.df(jsonPath, "json")
794  rdf <- collect(df)
795  expect_true(is.data.frame(rdf))
796  expect_equal(rdf$name[1], markUtf8("안녕하세요"))
797  expect_equal(rdf$name[2], markUtf8("您好"))
798  expect_equal(rdf$name[3], markUtf8("こんにちは"))
799  expect_equal(rdf$name[4], markUtf8("Xin chào"))
800
801  df1 <- createDataFrame(rdf)
802  expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好"))
803})
804
805test_that("multiple pipeline transformations result in an RDD with the correct values", {
806  df <- read.json(jsonPath)
807  first <- lapply(df, function(row) {
808    row$age <- row$age + 5
809    row
810  })
811  second <- lapply(first, function(row) {
812    row$testCol <- if (row$age == 35 && !is.na(row$age)) TRUE else FALSE
813    row
814  })
815  expect_is(second, "RDD")
816  expect_equal(countRDD(second), 3)
817  expect_equal(collectRDD(second)[[2]]$age, 35)
818  expect_true(collectRDD(second)[[2]]$testCol)
819  expect_false(collectRDD(second)[[3]]$testCol)
820})
821
822test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame", {
823  df <- read.json(jsonPath)
824  expect_false(df@env$isCached)
825  cache(df)
826  expect_true(df@env$isCached)
827
828  unpersist(df)
829  expect_false(df@env$isCached)
830
831  persist(df, "MEMORY_AND_DISK")
832  expect_true(df@env$isCached)
833
834  expect_equal(storageLevel(df),
835    "MEMORY_AND_DISK - StorageLevel(disk, memory, deserialized, 1 replicas)")
836
837  unpersist(df)
838  expect_false(df@env$isCached)
839
840  # make sure the data is collectable
841  expect_true(is.data.frame(collect(df)))
842})
843
844test_that("schema(), dtypes(), columns(), names() return the correct values/format", {
845  df <- read.json(jsonPath)
846  testSchema <- schema(df)
847  expect_equal(length(testSchema$fields()), 2)
848  expect_equal(testSchema$fields()[[1]]$dataType.toString(), "LongType")
849  expect_equal(testSchema$fields()[[2]]$dataType.simpleString(), "string")
850  expect_equal(testSchema$fields()[[1]]$name(), "age")
851
852  testTypes <- dtypes(df)
853  expect_equal(length(testTypes[[1]]), 2)
854  expect_equal(testTypes[[1]][1], "age")
855
856  testCols <- columns(df)
857  expect_equal(length(testCols), 2)
858  expect_equal(testCols[2], "name")
859
860  testNames <- names(df)
861  expect_equal(length(testNames), 2)
862  expect_equal(testNames[2], "name")
863})
864
865test_that("names() colnames() set the column names", {
866  df <- read.json(jsonPath)
867  names(df) <- c("col1", "col2")
868  expect_equal(colnames(df)[2], "col2")
869
870  colnames(df) <- c("col3", "col4")
871  expect_equal(names(df)[1], "col3")
872
873  expect_error(colnames(df) <- c("sepal.length", "sepal_width"),
874               "Column names cannot contain the '.' symbol.")
875  expect_error(colnames(df) <- c(1, 2), "Invalid column names.")
876  expect_error(colnames(df) <- c("a"),
877               "Column names must have the same length as the number of columns in the dataset.")
878  expect_error(colnames(df) <- c("1", NA), "Column names cannot be NA.")
879
880  # Note: if this test is broken, remove check for "." character on colnames<- method
881  irisDF <- suppressWarnings(createDataFrame(iris))
882  expect_equal(names(irisDF)[1], "Sepal_Length")
883
884  # Test base::colnames base::names
885  m2 <- cbind(1, 1:4)
886  expect_equal(colnames(m2, do.NULL = FALSE), c("col1", "col2"))
887  colnames(m2) <- c("x", "Y")
888  expect_equal(colnames(m2), c("x", "Y"))
889
890  z <- list(a = 1, b = "c", c = 1:3)
891  expect_equal(names(z)[3], "c")
892  names(z)[3] <- "c2"
893  expect_equal(names(z)[3], "c2")
894})
895
896test_that("head() and first() return the correct data", {
897  df <- read.json(jsonPath)
898  testHead <- head(df)
899  expect_equal(nrow(testHead), 3)
900  expect_equal(ncol(testHead), 2)
901
902  testHead2 <- head(df, 2)
903  expect_equal(nrow(testHead2), 2)
904  expect_equal(ncol(testHead2), 2)
905
906  testFirst <- first(df)
907  expect_equal(nrow(testFirst), 1)
908
909  # head() and first() return the correct data on
910  # a DataFrame with 0 row
911  df0 <- limit(df, 0)
912
913  testHead <- head(df0)
914  expect_equal(nrow(testHead), 0)
915  expect_equal(ncol(testHead), 2)
916
917  testFirst <- first(df0)
918  expect_equal(nrow(testFirst), 0)
919  expect_equal(ncol(testFirst), 2)
920})
921
922test_that("distinct(), unique() and dropDuplicates() on DataFrames", {
923  lines <- c("{\"name\":\"Michael\"}",
924             "{\"name\":\"Andy\", \"age\":30}",
925             "{\"name\":\"Justin\", \"age\":19}",
926             "{\"name\":\"Justin\", \"age\":19}")
927  jsonPathWithDup <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
928  writeLines(lines, jsonPathWithDup)
929
930  df <- read.json(jsonPathWithDup)
931  uniques <- distinct(df)
932  expect_is(uniques, "SparkDataFrame")
933  expect_equal(count(uniques), 3)
934
935  uniques2 <- unique(df)
936  expect_is(uniques2, "SparkDataFrame")
937  expect_equal(count(uniques2), 3)
938
939  # Test dropDuplicates()
940  df <- createDataFrame(
941    list(
942      list(2, 1, 2), list(1, 1, 1),
943      list(1, 2, 1), list(2, 1, 2),
944      list(2, 2, 2), list(2, 2, 1),
945      list(2, 1, 1), list(1, 1, 2),
946      list(1, 2, 2), list(1, 2, 1)),
947    schema = c("key", "value1", "value2"))
948  result <- collect(dropDuplicates(df))
949  expected <- rbind.data.frame(
950    c(1, 1, 1), c(1, 1, 2), c(1, 2, 1),
951    c(1, 2, 2), c(2, 1, 1), c(2, 1, 2),
952    c(2, 2, 1), c(2, 2, 2))
953  names(expected) <- c("key", "value1", "value2")
954  expect_equivalent(
955    result[order(result$key, result$value1, result$value2), ],
956    expected)
957
958  result <- collect(dropDuplicates(df, c("key", "value1")))
959  expected <- rbind.data.frame(
960    c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2))
961  names(expected) <- c("key", "value1", "value2")
962  expect_equivalent(
963    result[order(result$key, result$value1, result$value2), ],
964    expected)
965
966  result <- collect(dropDuplicates(df, "key", "value1"))
967  expected <- rbind.data.frame(
968    c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2))
969  names(expected) <- c("key", "value1", "value2")
970  expect_equivalent(
971    result[order(result$key, result$value1, result$value2), ],
972    expected)
973
974  result <- collect(dropDuplicates(df, "key"))
975  expected <- rbind.data.frame(
976    c(1, 1, 1), c(2, 1, 2))
977  names(expected) <- c("key", "value1", "value2")
978  expect_equivalent(
979    result[order(result$key, result$value1, result$value2), ],
980    expected)
981})
982
983test_that("sample on a DataFrame", {
984  df <- read.json(jsonPath)
985  sampled <- sample(df, FALSE, 1.0)
986  expect_equal(nrow(collect(sampled)), count(df))
987  expect_is(sampled, "SparkDataFrame")
988  sampled2 <- sample(df, FALSE, 0.1, 0) # set seed for predictable result
989  expect_true(count(sampled2) < 3)
990
991  count1 <- count(sample(df, FALSE, 0.1, 0))
992  count2 <- count(sample(df, FALSE, 0.1, 0))
993  expect_equal(count1, count2)
994
995  # Also test sample_frac
996  sampled3 <- sample_frac(df, FALSE, 0.1, 0) # set seed for predictable result
997  expect_true(count(sampled3) < 3)
998
999  # nolint start
1000  # Test base::sample is working
1001  #expect_equal(length(sample(1:12)), 12)
1002  # nolint end
1003})
1004
1005test_that("select operators", {
1006  df <- select(read.json(jsonPath), "name", "age")
1007  expect_is(df$name, "Column")
1008  expect_is(df[[2]], "Column")
1009  expect_is(df[["age"]], "Column")
1010
1011  expect_warning(df[[1:2]],
1012                 "Subset index has length > 1. Only the first index is used.")
1013  expect_is(suppressWarnings(df[[1:2]]), "Column")
1014  expect_warning(df[[c("name", "age")]],
1015                 "Subset index has length > 1. Only the first index is used.")
1016  expect_is(suppressWarnings(df[[c("name", "age")]]), "Column")
1017
1018  expect_warning(df[[1:2]] <- df[[1]],
1019                 "Subset index has length > 1. Only the first index is used.")
1020  expect_warning(df[[c("name", "age")]] <- df[[1]],
1021                 "Subset index has length > 1. Only the first index is used.")
1022
1023  expect_is(df[, 1, drop = F], "SparkDataFrame")
1024  expect_equal(columns(df[, 1, drop = F]), c("name"))
1025  expect_equal(columns(df[, "age", drop = F]), c("age"))
1026
1027  df2 <- df[, c("age", "name")]
1028  expect_is(df2, "SparkDataFrame")
1029  expect_equal(columns(df2), c("age", "name"))
1030
1031  df$age2 <- df$age
1032  expect_equal(columns(df), c("name", "age", "age2"))
1033  expect_equal(count(where(df, df$age2 == df$age)), 2)
1034  df$age2 <- df$age * 2
1035  expect_equal(columns(df), c("name", "age", "age2"))
1036  expect_equal(count(where(df, df$age2 == df$age * 2)), 2)
1037  df$age2 <- df[["age"]] * 3
1038  expect_equal(columns(df), c("name", "age", "age2"))
1039  expect_equal(count(where(df, df$age2 == df$age * 3)), 2)
1040
1041  df$age2 <- 21
1042  expect_equal(columns(df), c("name", "age", "age2"))
1043  expect_equal(count(where(df, df$age2 == 21)), 3)
1044
1045  df$age2 <- c(22)
1046  expect_equal(columns(df), c("name", "age", "age2"))
1047  expect_equal(count(where(df, df$age2 == 22)), 3)
1048
1049  expect_error(df$age3 <- c(22, NA),
1050              "value must be a Column, literal value as atomic in length of 1, or NULL")
1051
1052  df[["age2"]] <- 23
1053  expect_equal(columns(df), c("name", "age", "age2"))
1054  expect_equal(count(where(df, df$age2 == 23)), 3)
1055
1056  df[[3]] <- 24
1057  expect_equal(columns(df), c("name", "age", "age2"))
1058  expect_equal(count(where(df, df$age2 == 24)), 3)
1059
1060  df[[3]] <- df$age
1061  expect_equal(count(where(df, df$age2 == df$age)), 2)
1062
1063  df[["age2"]] <- df[["name"]]
1064  expect_equal(count(where(df, df$age2 == df$name)), 3)
1065
1066  expect_error(df[["age3"]] <- c(22, 23),
1067              "value must be a Column, literal value as atomic in length of 1, or NULL")
1068
1069  # Test parameter drop
1070  expect_equal(class(df[, 1]) == "SparkDataFrame", T)
1071  expect_equal(class(df[, 1, drop = T]) == "Column", T)
1072  expect_equal(class(df[, 1, drop = F]) == "SparkDataFrame", T)
1073  expect_equal(class(df[df$age > 4, 2, drop = T]) == "Column", T)
1074  expect_equal(class(df[df$age > 4, 2, drop = F]) == "SparkDataFrame", T)
1075})
1076
1077test_that("select with column", {
1078  df <- read.json(jsonPath)
1079  df1 <- select(df, "name")
1080  expect_equal(columns(df1), c("name"))
1081  expect_equal(count(df1), 3)
1082
1083  df2 <- select(df, df$age)
1084  expect_equal(columns(df2), c("age"))
1085  expect_equal(count(df2), 3)
1086
1087  df3 <- select(df, lit("x"))
1088  expect_equal(columns(df3), c("x"))
1089  expect_equal(count(df3), 3)
1090  expect_equal(collect(select(df3, "x"))[[1, 1]], "x")
1091
1092  df4 <- select(df, c("name", "age"))
1093  expect_equal(columns(df4), c("name", "age"))
1094  expect_equal(count(df4), 3)
1095
1096  expect_error(select(df, c("name", "age"), "name"),
1097                "To select multiple columns, use a character vector or list for col")
1098})
1099
1100test_that("drop column", {
1101  df <- select(read.json(jsonPath), "name", "age")
1102  df1 <- drop(df, "name")
1103  expect_equal(columns(df1), c("age"))
1104
1105  df$age2 <- df$age
1106  df1 <- drop(df, c("name", "age"))
1107  expect_equal(columns(df1), c("age2"))
1108
1109  df1 <- drop(df, df$age)
1110  expect_equal(columns(df1), c("name", "age2"))
1111
1112  df$age2 <- NULL
1113  expect_equal(columns(df), c("name", "age"))
1114  df$age3 <- NULL
1115  expect_equal(columns(df), c("name", "age"))
1116
1117  # Test to make sure base::drop is not masked
1118  expect_equal(drop(1:3 %*% 2:4), 20)
1119})
1120
1121test_that("subsetting", {
1122  # read.json returns columns in random order
1123  df <- select(read.json(jsonPath), "name", "age")
1124  filtered <- df[df$age > 20, ]
1125  expect_equal(count(filtered), 1)
1126  expect_equal(columns(filtered), c("name", "age"))
1127  expect_equal(collect(filtered)$name, "Andy")
1128
1129  df2 <- df[df$age == 19, 1, drop = F]
1130  expect_is(df2, "SparkDataFrame")
1131  expect_equal(count(df2), 1)
1132  expect_equal(columns(df2), c("name"))
1133  expect_equal(collect(df2)$name, "Justin")
1134
1135  df3 <- df[df$age > 20, 2, drop = F]
1136  expect_equal(count(df3), 1)
1137  expect_equal(columns(df3), c("age"))
1138
1139  df4 <- df[df$age %in% c(19, 30), 1:2]
1140  expect_equal(count(df4), 2)
1141  expect_equal(columns(df4), c("name", "age"))
1142
1143  df5 <- df[df$age %in% c(19), c(1, 2)]
1144  expect_equal(count(df5), 1)
1145  expect_equal(columns(df5), c("name", "age"))
1146
1147  df6 <- subset(df, df$age %in% c(30), c(1, 2))
1148  expect_equal(count(df6), 1)
1149  expect_equal(columns(df6), c("name", "age"))
1150
1151  df7 <- subset(df, select = "name", drop = F)
1152  expect_equal(count(df7), 3)
1153  expect_equal(columns(df7), c("name"))
1154
1155  # Test base::subset is working
1156  expect_equal(nrow(subset(airquality, Temp > 80, select = c(Ozone, Temp))), 68)
1157})
1158
1159test_that("selectExpr() on a DataFrame", {
1160  df <- read.json(jsonPath)
1161  selected <- selectExpr(df, "age * 2")
1162  expect_equal(names(selected), "(age * 2)")
1163  expect_equal(collect(selected), collect(select(df, df$age * 2L)))
1164
1165  selected2 <- selectExpr(df, "name as newName", "abs(age) as age")
1166  expect_equal(names(selected2), c("newName", "age"))
1167  expect_equal(count(selected2), 3)
1168})
1169
1170test_that("expr() on a DataFrame", {
1171  df <- read.json(jsonPath)
1172  expect_equal(collect(select(df, expr("abs(-123)")))[1, 1], 123)
1173})
1174
1175test_that("column calculation", {
1176  df <- read.json(jsonPath)
1177  d <- collect(select(df, alias(df$age + 1, "age2")))
1178  expect_equal(names(d), c("age2"))
1179  df2 <- select(df, lower(df$name), abs(df$age))
1180  expect_is(df2, "SparkDataFrame")
1181  expect_equal(count(df2), 3)
1182})
1183
1184test_that("test HiveContext", {
1185  setHiveContext(sc)
1186  df <- createExternalTable("json", jsonPath, "json")
1187  expect_is(df, "SparkDataFrame")
1188  expect_equal(count(df), 3)
1189  df2 <- sql("select * from json")
1190  expect_is(df2, "SparkDataFrame")
1191  expect_equal(count(df2), 3)
1192
1193  jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1194  invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2))
1195  df3 <- sql("select * from json2")
1196  expect_is(df3, "SparkDataFrame")
1197  expect_equal(count(df3), 3)
1198  unlink(jsonPath2)
1199
1200  hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1201  invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
1202  df4 <- sql("select * from hivetestbl")
1203  expect_is(df4, "SparkDataFrame")
1204  expect_equal(count(df4), 3)
1205  unlink(hivetestDataPath)
1206
1207  parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1208  invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath))
1209  df5 <- sql("select * from parquetest")
1210  expect_is(df5, "SparkDataFrame")
1211  expect_equal(count(df5), 3)
1212  unlink(parquetDataPath)
1213  unsetHiveContext()
1214})
1215
1216test_that("column operators", {
1217  c <- column("a")
1218  c2 <- (- c + 1 - 2) * 3 / 4.0
1219  c3 <- (c + c2 - c2) * c2 %% c2
1220  c4 <- (c > c2) & (c2 <= c3) | (c == c2) & (c2 != c3)
1221  c5 <- c2 ^ c3 ^ c4
1222})
1223
1224test_that("column functions", {
1225  c <- column("a")
1226  c1 <- abs(c) + acos(c) + approxCountDistinct(c) + ascii(c) + asin(c) + atan(c)
1227  c2 <- avg(c) + base64(c) + bin(c) + bitwiseNOT(c) + cbrt(c) + ceil(c) + cos(c)
1228  c3 <- cosh(c) + count(c) + crc32(c) + hash(c) + exp(c)
1229  c4 <- explode(c) + expm1(c) + factorial(c) + first(c) + floor(c) + hex(c)
1230  c5 <- hour(c) + initcap(c) + last(c) + last_day(c) + length(c)
1231  c6 <- log(c) + (c) + log1p(c) + log2(c) + lower(c) + ltrim(c) + max(c) + md5(c)
1232  c7 <- mean(c) + min(c) + month(c) + negate(c) + posexplode(c) + quarter(c)
1233  c8 <- reverse(c) + rint(c) + round(c) + rtrim(c) + sha1(c) + monotonically_increasing_id()
1234  c9 <- signum(c) + sin(c) + sinh(c) + size(c) + stddev(c) + soundex(c) + sqrt(c) + sum(c)
1235  c10 <- sumDistinct(c) + tan(c) + tanh(c) + toDegrees(c) + toRadians(c)
1236  c11 <- to_date(c) + trim(c) + unbase64(c) + unhex(c) + upper(c)
1237  c12 <- variance(c)
1238  c13 <- lead("col", 1) + lead(c, 1) + lag("col", 1) + lag(c, 1)
1239  c14 <- cume_dist() + ntile(1) + corr(c, c1)
1240  c15 <- dense_rank() + percent_rank() + rank() + row_number()
1241  c16 <- is.nan(c) + isnan(c) + isNaN(c)
1242  c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
1243  c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
1244  c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3)
1245
1246  # Test if base::is.nan() is exposed
1247  expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))
1248
1249  # Test if base::rank() is exposed
1250  expect_equal(class(rank())[[1]], "Column")
1251  expect_equal(rank(1:3), as.numeric(c(1:3)))
1252
1253  df <- read.json(jsonPath)
1254  df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
1255  expect_equal(collect(df2)[[2, 1]], TRUE)
1256  expect_equal(collect(df2)[[2, 2]], FALSE)
1257  expect_equal(collect(df2)[[3, 1]], FALSE)
1258  expect_equal(collect(df2)[[3, 2]], TRUE)
1259
1260  df3 <- select(df, between(df$name, c("Apache", "Spark")))
1261  expect_equal(collect(df3)[[1, 1]], TRUE)
1262  expect_equal(collect(df3)[[2, 1]], FALSE)
1263  expect_equal(collect(df3)[[3, 1]], TRUE)
1264
1265  df4 <- select(df, countDistinct(df$age, df$name))
1266  expect_equal(collect(df4)[[1, 1]], 2)
1267
1268  expect_equal(collect(select(df, sum(df$age)))[1, 1], 49)
1269  expect_true(abs(collect(select(df, stddev(df$age)))[1, 1] - 7.778175) < 1e-6)
1270  expect_equal(collect(select(df, var_pop(df$age)))[1, 1], 30.25)
1271
1272  df5 <- createDataFrame(list(list(a = "010101")))
1273  expect_equal(collect(select(df5, conv(df5$a, 2, 16)))[1, 1], "15")
1274
1275  # Test array_contains() and sort_array()
1276  df <- createDataFrame(list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L))))
1277  result <- collect(select(df, array_contains(df[[1]], 1L)))[[1]]
1278  expect_equal(result, c(TRUE, FALSE))
1279
1280  result <- collect(select(df, sort_array(df[[1]], FALSE)))[[1]]
1281  expect_equal(result, list(list(3L, 2L, 1L), list(6L, 5L, 4L)))
1282  result <- collect(select(df, sort_array(df[[1]])))[[1]]
1283  expect_equal(result, list(list(1L, 2L, 3L), list(4L, 5L, 6L)))
1284
1285  # Test that stats::lag is working
1286  expect_equal(length(lag(ldeaths, 12)), 72)
1287
1288  # Test struct()
1289  df <- createDataFrame(list(list(1L, 2L, 3L), list(4L, 5L, 6L)),
1290                        schema = c("a", "b", "c"))
1291  result <- collect(select(df, alias(struct("a", "c"), "d")))
1292  expected <- data.frame(row.names = 1:2)
1293  expected$"d" <- list(listToStruct(list(a = 1L, c = 3L)),
1294                      listToStruct(list(a = 4L, c = 6L)))
1295  expect_equal(result, expected)
1296
1297  result <- collect(select(df, alias(struct(df$a, df$b), "d")))
1298  expected <- data.frame(row.names = 1:2)
1299  expected$"d" <- list(listToStruct(list(a = 1L, b = 2L)),
1300                      listToStruct(list(a = 4L, b = 5L)))
1301  expect_equal(result, expected)
1302
1303  # Test encode(), decode()
1304  bytes <- as.raw(c(0xe5, 0xa4, 0xa7, 0xe5, 0x8d, 0x83, 0xe4, 0xb8, 0x96, 0xe7, 0x95, 0x8c))
1305  df <- createDataFrame(list(list(markUtf8("大千世界"), "utf-8", bytes)),
1306                        schema = c("a", "b", "c"))
1307  result <- collect(select(df, encode(df$a, "utf-8"), decode(df$c, "utf-8")))
1308  expect_equal(result[[1]][[1]], bytes)
1309  expect_equal(result[[2]], markUtf8("大千世界"))
1310
1311  # Test first(), last()
1312  df <- read.json(jsonPath)
1313  expect_equal(collect(select(df, first(df$age)))[[1]], NA_real_)
1314  expect_equal(collect(select(df, first(df$age, TRUE)))[[1]], 30)
1315  expect_equal(collect(select(df, first("age")))[[1]], NA_real_)
1316  expect_equal(collect(select(df, first("age", TRUE)))[[1]], 30)
1317  expect_equal(collect(select(df, last(df$age)))[[1]], 19)
1318  expect_equal(collect(select(df, last(df$age, TRUE)))[[1]], 19)
1319  expect_equal(collect(select(df, last("age")))[[1]], 19)
1320  expect_equal(collect(select(df, last("age", TRUE)))[[1]], 19)
1321
1322  # Test bround()
1323  df <- createDataFrame(data.frame(x = c(2.5, 3.5)))
1324  expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
1325  expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)
1326})
1327
1328test_that("column binary mathfunctions", {
1329  lines <- c("{\"a\":1, \"b\":5}",
1330             "{\"a\":2, \"b\":6}",
1331             "{\"a\":3, \"b\":7}",
1332             "{\"a\":4, \"b\":8}")
1333  jsonPathWithDup <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1334  writeLines(lines, jsonPathWithDup)
1335  df <- read.json(jsonPathWithDup)
1336  expect_equal(collect(select(df, atan2(df$a, df$b)))[1, "ATAN2(a, b)"], atan2(1, 5))
1337  expect_equal(collect(select(df, atan2(df$a, df$b)))[2, "ATAN2(a, b)"], atan2(2, 6))
1338  expect_equal(collect(select(df, atan2(df$a, df$b)))[3, "ATAN2(a, b)"], atan2(3, 7))
1339  expect_equal(collect(select(df, atan2(df$a, df$b)))[4, "ATAN2(a, b)"], atan2(4, 8))
1340  ## nolint start
1341  expect_equal(collect(select(df, hypot(df$a, df$b)))[1, "HYPOT(a, b)"], sqrt(1^2 + 5^2))
1342  expect_equal(collect(select(df, hypot(df$a, df$b)))[2, "HYPOT(a, b)"], sqrt(2^2 + 6^2))
1343  expect_equal(collect(select(df, hypot(df$a, df$b)))[3, "HYPOT(a, b)"], sqrt(3^2 + 7^2))
1344  expect_equal(collect(select(df, hypot(df$a, df$b)))[4, "HYPOT(a, b)"], sqrt(4^2 + 8^2))
1345  ## nolint end
1346  expect_equal(collect(select(df, shiftLeft(df$b, 1)))[4, 1], 16)
1347  expect_equal(collect(select(df, shiftRight(df$b, 1)))[4, 1], 4)
1348  expect_equal(collect(select(df, shiftRightUnsigned(df$b, 1)))[4, 1], 4)
1349  expect_equal(class(collect(select(df, rand()))[2, 1]), "numeric")
1350  expect_equal(collect(select(df, rand(1)))[1, 1], 0.134, tolerance = 0.01)
1351  expect_equal(class(collect(select(df, randn()))[2, 1]), "numeric")
1352  expect_equal(collect(select(df, randn(1)))[1, 1], -1.03, tolerance = 0.01)
1353})
1354
1355test_that("string operators", {
1356  df <- read.json(jsonPath)
1357  expect_equal(count(where(df, like(df$name, "A%"))), 1)
1358  expect_equal(count(where(df, startsWith(df$name, "A"))), 1)
1359  expect_true(first(select(df, startsWith(df$name, "M")))[[1]])
1360  expect_false(first(select(df, startsWith(df$name, "m")))[[1]])
1361  expect_true(first(select(df, endsWith(df$name, "el")))[[1]])
1362  expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi")
1363  if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) {
1364    expect_true(startsWith("Hello World", "Hello"))
1365    expect_false(endsWith("Hello World", "a"))
1366  }
1367  expect_equal(collect(select(df, cast(df$age, "string")))[[2, 1]], "30")
1368  expect_equal(collect(select(df, concat(df$name, lit(":"), df$age)))[[2, 1]], "Andy:30")
1369  expect_equal(collect(select(df, concat_ws(":", df$name)))[[2, 1]], "Andy")
1370  expect_equal(collect(select(df, concat_ws(":", df$name, df$age)))[[2, 1]], "Andy:30")
1371  expect_equal(collect(select(df, instr(df$name, "i")))[, 1], c(2, 0, 5))
1372  expect_equal(collect(select(df, format_number(df$age, 2)))[2, 1], "30.00")
1373  expect_equal(collect(select(df, sha1(df$name)))[2, 1],
1374               "ab5a000e88b5d9d0fa2575f5c6263eb93452405d")
1375  expect_equal(collect(select(df, sha2(df$name, 256)))[2, 1],
1376               "80f2aed3c618c423ddf05a2891229fba44942d907173152442cf6591441ed6dc")
1377  expect_equal(collect(select(df, format_string("Name:%s", df$name)))[2, 1], "Name:Andy")
1378  expect_equal(collect(select(df, format_string("%s, %d", df$name, df$age)))[2, 1], "Andy, 30")
1379  expect_equal(collect(select(df, regexp_extract(df$name, "(n.y)", 1)))[2, 1], "ndy")
1380  expect_equal(collect(select(df, regexp_replace(df$name, "(n.y)", "ydn")))[2, 1], "Aydn")
1381
1382  l2 <- list(list(a = "aaads"))
1383  df2 <- createDataFrame(l2)
1384  expect_equal(collect(select(df2, locate("aa", df2$a)))[1, 1], 1)
1385  expect_equal(collect(select(df2, locate("aa", df2$a, 2)))[1, 1], 2)
1386  expect_equal(collect(select(df2, lpad(df2$a, 8, "#")))[1, 1], "###aaads") # nolint
1387  expect_equal(collect(select(df2, rpad(df2$a, 8, "#")))[1, 1], "aaads###") # nolint
1388
1389  l3 <- list(list(a = "a.b.c.d"))
1390  df3 <- createDataFrame(l3)
1391  expect_equal(collect(select(df3, substring_index(df3$a, ".", 2)))[1, 1], "a.b")
1392  expect_equal(collect(select(df3, substring_index(df3$a, ".", -3)))[1, 1], "b.c.d")
1393  expect_equal(collect(select(df3, translate(df3$a, "bc", "12")))[1, 1], "a.1.2.d")
1394})
1395
1396test_that("date functions on a DataFrame", {
1397  .originalTimeZone <- Sys.getenv("TZ")
1398  Sys.setenv(TZ = "UTC")
1399  l <- list(list(a = 1L, b = as.Date("2012-12-13")),
1400            list(a = 2L, b = as.Date("2013-12-14")),
1401            list(a = 3L, b = as.Date("2014-12-15")))
1402  df <- createDataFrame(l)
1403  expect_equal(collect(select(df, dayofmonth(df$b)))[, 1], c(13, 14, 15))
1404  expect_equal(collect(select(df, dayofyear(df$b)))[, 1], c(348, 348, 349))
1405  expect_equal(collect(select(df, weekofyear(df$b)))[, 1], c(50, 50, 51))
1406  expect_equal(collect(select(df, year(df$b)))[, 1], c(2012, 2013, 2014))
1407  expect_equal(collect(select(df, month(df$b)))[, 1], c(12, 12, 12))
1408  expect_equal(collect(select(df, last_day(df$b)))[, 1],
1409               c(as.Date("2012-12-31"), as.Date("2013-12-31"), as.Date("2014-12-31")))
1410  expect_equal(collect(select(df, next_day(df$b, "MONDAY")))[, 1],
1411               c(as.Date("2012-12-17"), as.Date("2013-12-16"), as.Date("2014-12-22")))
1412  expect_equal(collect(select(df, date_format(df$b, "y")))[, 1], c("2012", "2013", "2014"))
1413  expect_equal(collect(select(df, add_months(df$b, 3)))[, 1],
1414               c(as.Date("2013-03-13"), as.Date("2014-03-14"), as.Date("2015-03-15")))
1415  expect_equal(collect(select(df, date_add(df$b, 1)))[, 1],
1416               c(as.Date("2012-12-14"), as.Date("2013-12-15"), as.Date("2014-12-16")))
1417  expect_equal(collect(select(df, date_sub(df$b, 1)))[, 1],
1418               c(as.Date("2012-12-12"), as.Date("2013-12-13"), as.Date("2014-12-14")))
1419
1420  l2 <- list(list(a = 1L, b = as.POSIXlt("2012-12-13 12:34:00", tz = "UTC")),
1421            list(a = 2L, b = as.POSIXlt("2014-12-15 01:24:34", tz = "UTC")))
1422  df2 <- createDataFrame(l2)
1423  expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24))
1424  expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34))
1425  expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1],
1426               c(as.POSIXlt("2012-12-13 21:34:00 UTC"), as.POSIXlt("2014-12-15 10:24:34 UTC")))
1427  expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1],
1428               c(as.POSIXlt("2012-12-13 03:34:00 UTC"), as.POSIXlt("2014-12-14 16:24:34 UTC")))
1429  expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0)
1430  expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0)
1431  expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0)
1432
1433  l3 <- list(list(a = 1000), list(a = -1000))
1434  df3 <- createDataFrame(l3)
1435  result31 <- collect(select(df3, from_unixtime(df3$a)))
1436  expect_equal(grep("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", result31[, 1], perl = TRUE),
1437               c(1, 2))
1438  result32 <- collect(select(df3, from_unixtime(df3$a, "yyyy")))
1439  expect_equal(grep("\\d{4}", result32[, 1]), c(1, 2))
1440  Sys.setenv(TZ = .originalTimeZone)
1441})
1442
1443test_that("greatest() and least() on a DataFrame", {
1444  l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
1445  df <- createDataFrame(l)
1446  expect_equal(collect(select(df, greatest(df$a, df$b)))[, 1], c(2, 4))
1447  expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3))
1448})
1449
1450test_that("time windowing (window()) with all inputs", {
1451  df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1452  df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds")
1453  local <- collect(df)$v
1454  # Not checking time windows because of possible time zone issues. Just checking that the function
1455  # works
1456  expect_equal(local, c(1))
1457})
1458
1459test_that("time windowing (window()) with slide duration", {
1460  df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1461  df$window <- window(df$t, "5 seconds", "2 seconds")
1462  local <- collect(df)$v
1463  # Not checking time windows because of possible time zone issues. Just checking that the function
1464  # works
1465  expect_equal(local, c(1, 1))
1466})
1467
1468test_that("time windowing (window()) with start time", {
1469  df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1470  df$window <- window(df$t, "5 seconds", startTime = "2 seconds")
1471  local <- collect(df)$v
1472  # Not checking time windows because of possible time zone issues. Just checking that the function
1473  # works
1474  expect_equal(local, c(1))
1475})
1476
1477test_that("time windowing (window()) with just window duration", {
1478  df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1479  df$window <- window(df$t, "5 seconds")
1480  local <- collect(df)$v
1481  # Not checking time windows because of possible time zone issues. Just checking that the function
1482  # works
1483  expect_equal(local, c(1))
1484})
1485
1486test_that("when(), otherwise() and ifelse() on a DataFrame", {
1487  l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
1488  df <- createDataFrame(l)
1489  expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, 1)))[, 1], c(NA, 1))
1490  expect_equal(collect(select(df, otherwise(when(df$a > 1, 1), 0)))[, 1], c(0, 1))
1491  expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, 0, 1)))[, 1], c(1, 0))
1492})
1493
1494test_that("when(), otherwise() and ifelse() with column on a DataFrame", {
1495  l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
1496  df <- createDataFrame(l)
1497  expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, lit(1))))[, 1], c(NA, 1))
1498  expect_equal(collect(select(df, otherwise(when(df$a > 1, lit(1)), lit(0))))[, 1], c(0, 1))
1499  expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, lit(0), lit(1))))[, 1], c(1, 0))
1500})
1501
1502test_that("group by, agg functions", {
1503  df <- read.json(jsonPath)
1504  df1 <- agg(df, name = "max", age = "sum")
1505  expect_equal(1, count(df1))
1506  df1 <- agg(df, age2 = max(df$age))
1507  expect_equal(1, count(df1))
1508  expect_equal(columns(df1), c("age2"))
1509
1510  gd <- groupBy(df, "name")
1511  expect_is(gd, "GroupedData")
1512  df2 <- count(gd)
1513  expect_is(df2, "SparkDataFrame")
1514  expect_equal(3, count(df2))
1515
1516  # Also test group_by, summarize, mean
1517  gd1 <- group_by(df, "name")
1518  expect_is(gd1, "GroupedData")
1519  df_summarized <- summarize(gd, mean_age = mean(df$age))
1520  expect_is(df_summarized, "SparkDataFrame")
1521  expect_equal(3, count(df_summarized))
1522
1523  df3 <- agg(gd, age = "stddev")
1524  expect_is(df3, "SparkDataFrame")
1525  df3_local <- collect(df3)
1526  expect_true(is.nan(df3_local[df3_local$name == "Andy", ][1, 2]))
1527
1528  df4 <- agg(gd, sumAge = sum(df$age))
1529  expect_is(df4, "SparkDataFrame")
1530  expect_equal(3, count(df4))
1531  expect_equal(columns(df4), c("name", "sumAge"))
1532
1533  df5 <- sum(gd, "age")
1534  expect_is(df5, "SparkDataFrame")
1535  expect_equal(3, count(df5))
1536
1537  expect_equal(3, count(mean(gd)))
1538  expect_equal(3, count(max(gd)))
1539  expect_equal(30, collect(max(gd))[2, 2])
1540  expect_equal(1, collect(count(gd))[1, 2])
1541
1542  mockLines2 <- c("{\"name\":\"ID1\", \"value\": \"10\"}",
1543                  "{\"name\":\"ID1\", \"value\": \"10\"}",
1544                  "{\"name\":\"ID1\", \"value\": \"22\"}",
1545                  "{\"name\":\"ID2\", \"value\": \"-3\"}")
1546  jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1547  writeLines(mockLines2, jsonPath2)
1548  gd2 <- groupBy(read.json(jsonPath2), "name")
1549  df6 <- agg(gd2, value = "sum")
1550  df6_local <- collect(df6)
1551  expect_equal(42, df6_local[df6_local$name == "ID1", ][1, 2])
1552  expect_equal(-3, df6_local[df6_local$name == "ID2", ][1, 2])
1553
1554  df7 <- agg(gd2, value = "stddev")
1555  df7_local <- collect(df7)
1556  expect_true(abs(df7_local[df7_local$name == "ID1", ][1, 2] - 6.928203) < 1e-6)
1557  expect_true(is.nan(df7_local[df7_local$name == "ID2", ][1, 2]))
1558
1559  mockLines3 <- c("{\"name\":\"Andy\", \"age\":30}",
1560                  "{\"name\":\"Andy\", \"age\":30}",
1561                  "{\"name\":\"Justin\", \"age\":19}",
1562                  "{\"name\":\"Justin\", \"age\":1}")
1563  jsonPath3 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1564  writeLines(mockLines3, jsonPath3)
1565  df8 <- read.json(jsonPath3)
1566  gd3 <- groupBy(df8, "name")
1567  gd3_local <- collect(sum(gd3))
1568  expect_equal(60, gd3_local[gd3_local$name == "Andy", ][1, 2])
1569  expect_equal(20, gd3_local[gd3_local$name == "Justin", ][1, 2])
1570
1571  expect_true(abs(collect(agg(df, sd(df$age)))[1, 1] - 7.778175) < 1e-6)
1572  gd3_local <- collect(agg(gd3, var(df8$age)))
1573  expect_equal(162, gd3_local[gd3_local$name == "Justin", ][1, 2])
1574
1575  # Test stats::sd, stats::var are working
1576  expect_true(abs(sd(1:2) - 0.7071068) < 1e-6)
1577  expect_true(abs(var(1:5, 1:5) - 2.5) < 1e-6)
1578
1579  unlink(jsonPath2)
1580  unlink(jsonPath3)
1581})
1582
1583test_that("pivot GroupedData column", {
1584  df <- createDataFrame(data.frame(
1585    earnings = c(10000, 10000, 11000, 15000, 12000, 20000, 21000, 22000),
1586    course = c("R", "Python", "R", "Python", "R", "Python", "R", "Python"),
1587    year = c(2013, 2013, 2014, 2014, 2015, 2015, 2016, 2016)
1588  ))
1589  sum1 <- collect(sum(pivot(groupBy(df, "year"), "course"), "earnings"))
1590  sum2 <- collect(sum(pivot(groupBy(df, "year"), "course", c("Python", "R")), "earnings"))
1591  sum3 <- collect(sum(pivot(groupBy(df, "year"), "course", list("Python", "R")), "earnings"))
1592  sum4 <- collect(sum(pivot(groupBy(df, "year"), "course", "R"), "earnings"))
1593
1594  correct_answer <- data.frame(
1595    year = c(2013, 2014, 2015, 2016),
1596    Python = c(10000, 15000, 20000, 22000),
1597    R = c(10000, 11000, 12000, 21000)
1598  )
1599  expect_equal(sum1, correct_answer)
1600  expect_equal(sum2, correct_answer)
1601  expect_equal(sum3, correct_answer)
1602  expect_equal(sum4, correct_answer[, c("year", "R")])
1603
1604  expect_error(collect(sum(pivot(groupBy(df, "year"), "course", c("R", "R")), "earnings")))
1605  expect_error(collect(sum(pivot(groupBy(df, "year"), "course", list("R", "R")), "earnings")))
1606})
1607
1608test_that("arrange() and orderBy() on a DataFrame", {
1609  df <- read.json(jsonPath)
1610  sorted <- arrange(df, df$age)
1611  expect_equal(collect(sorted)[1, 2], "Michael")
1612
1613  sorted2 <- arrange(df, "name", decreasing = FALSE)
1614  expect_equal(collect(sorted2)[2, "age"], 19)
1615
1616  sorted3 <- orderBy(df, asc(df$age))
1617  expect_true(is.na(first(sorted3)$age))
1618  expect_equal(collect(sorted3)[2, "age"], 19)
1619
1620  sorted4 <- orderBy(df, desc(df$name))
1621  expect_equal(first(sorted4)$name, "Michael")
1622  expect_equal(collect(sorted4)[3, "name"], "Andy")
1623
1624  sorted5 <- arrange(df, "age", "name", decreasing = TRUE)
1625  expect_equal(collect(sorted5)[1, 2], "Andy")
1626
1627  sorted6 <- arrange(df, "age", "name", decreasing = c(T, F))
1628  expect_equal(collect(sorted6)[1, 2], "Andy")
1629
1630  sorted7 <- arrange(df, "name", decreasing = FALSE)
1631  expect_equal(collect(sorted7)[2, "age"], 19)
1632})
1633
1634test_that("filter() on a DataFrame", {
1635  df <- read.json(jsonPath)
1636  filtered <- filter(df, "age > 20")
1637  expect_equal(count(filtered), 1)
1638  expect_equal(collect(filtered)$name, "Andy")
1639  filtered2 <- where(df, df$name != "Michael")
1640  expect_equal(count(filtered2), 2)
1641  expect_equal(collect(filtered2)$age[2], 19)
1642
1643  # test suites for %in%
1644  filtered3 <- filter(df, "age in (19)")
1645  expect_equal(count(filtered3), 1)
1646  filtered4 <- filter(df, "age in (19, 30)")
1647  expect_equal(count(filtered4), 2)
1648  filtered5 <- where(df, df$age %in% c(19))
1649  expect_equal(count(filtered5), 1)
1650  filtered6 <- where(df, df$age %in% c(19, 30))
1651  expect_equal(count(filtered6), 2)
1652
1653  # Test stats::filter is working
1654  #expect_true(is.ts(filter(1:100, rep(1, 3)))) # nolint
1655})
1656
1657test_that("join(), crossJoin() and merge() on a DataFrame", {
1658  df <- read.json(jsonPath)
1659
1660  mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
1661                  "{\"name\":\"Andy\",  \"test\": \"no\"}",
1662                  "{\"name\":\"Justin\", \"test\": \"yes\"}",
1663                  "{\"name\":\"Bob\", \"test\": \"yes\"}")
1664  jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1665  writeLines(mockLines2, jsonPath2)
1666  df2 <- read.json(jsonPath2)
1667
1668  # inner join, not cartesian join
1669  expect_equal(count(where(join(df, df2), df$name == df2$name)), 3)
1670  # cartesian join
1671  expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }),
1672               paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for",
1673                      " INNER join between logical plans).*"))
1674
1675  joined <- crossJoin(df, df2)
1676  expect_equal(names(joined), c("age", "name", "name", "test"))
1677  expect_equal(count(joined), 12)
1678  expect_equal(names(collect(joined)), c("age", "name", "name", "test"))
1679
1680  joined2 <- join(df, df2, df$name == df2$name)
1681  expect_equal(names(joined2), c("age", "name", "name", "test"))
1682  expect_equal(count(joined2), 3)
1683
1684  joined3 <- join(df, df2, df$name == df2$name, "rightouter")
1685  expect_equal(names(joined3), c("age", "name", "name", "test"))
1686  expect_equal(count(joined3), 4)
1687  expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2]))
1688
1689  joined4 <- select(join(df, df2, df$name == df2$name, "outer"),
1690                    alias(df$age + 5, "newAge"), df$name, df2$test)
1691  expect_equal(names(joined4), c("newAge", "name", "test"))
1692  expect_equal(count(joined4), 4)
1693  expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24)
1694
1695  joined5 <- join(df, df2, df$name == df2$name, "leftouter")
1696  expect_equal(names(joined5), c("age", "name", "name", "test"))
1697  expect_equal(count(joined5), 3)
1698  expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1]))
1699
1700  joined6 <- join(df, df2, df$name == df2$name, "inner")
1701  expect_equal(names(joined6), c("age", "name", "name", "test"))
1702  expect_equal(count(joined6), 3)
1703
1704  joined7 <- join(df, df2, df$name == df2$name, "leftsemi")
1705  expect_equal(names(joined7), c("age", "name"))
1706  expect_equal(count(joined7), 3)
1707
1708  joined8 <- join(df, df2, df$name == df2$name, "left_outer")
1709  expect_equal(names(joined8), c("age", "name", "name", "test"))
1710  expect_equal(count(joined8), 3)
1711  expect_true(is.na(collect(orderBy(joined8, joined8$age))$age[1]))
1712
1713  joined9 <- join(df, df2, df$name == df2$name, "right_outer")
1714  expect_equal(names(joined9), c("age", "name", "name", "test"))
1715  expect_equal(count(joined9), 4)
1716  expect_true(is.na(collect(orderBy(joined9, joined9$age))$age[2]))
1717
1718  merged <- merge(df, df2, by.x = "name", by.y = "name", all.x = TRUE, all.y = TRUE)
1719  expect_equal(count(merged), 4)
1720  expect_equal(names(merged), c("age", "name_x", "name_y", "test"))
1721  expect_equal(collect(orderBy(merged, merged$name_x))$age[3], 19)
1722
1723  merged <- merge(df, df2, suffixes = c("-X", "-Y"))
1724  expect_equal(count(merged), 3)
1725  expect_equal(names(merged), c("age", "name-X", "name-Y", "test"))
1726  expect_equal(collect(orderBy(merged, merged$"name-X"))$age[1], 30)
1727
1728  merged <- merge(df, df2, by = "name", suffixes = c("-X", "-Y"), sort = FALSE)
1729  expect_equal(count(merged), 3)
1730  expect_equal(names(merged), c("age", "name-X", "name-Y", "test"))
1731  expect_equal(collect(orderBy(merged, merged$"name-Y"))$"name-X"[3], "Michael")
1732
1733  merged <- merge(df, df2, by = "name", all = T, sort = T)
1734  expect_equal(count(merged), 4)
1735  expect_equal(names(merged), c("age", "name_x", "name_y", "test"))
1736  expect_equal(collect(orderBy(merged, merged$"name_y"))$"name_x"[1], "Andy")
1737
1738  merged <- merge(df, df2, by = NULL)
1739  expect_equal(count(merged), 12)
1740  expect_equal(names(merged), c("age", "name", "name", "test"))
1741
1742  mockLines3 <- c("{\"name\":\"Michael\", \"name_y\":\"Michael\", \"test\": \"yes\"}",
1743                  "{\"name\":\"Andy\", \"name_y\":\"Andy\", \"test\": \"no\"}",
1744                  "{\"name\":\"Justin\", \"name_y\":\"Justin\", \"test\": \"yes\"}",
1745                  "{\"name\":\"Bob\", \"name_y\":\"Bob\", \"test\": \"yes\"}")
1746  jsonPath3 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1747  writeLines(mockLines3, jsonPath3)
1748  df3 <- read.json(jsonPath3)
1749  expect_error(merge(df, df3),
1750               paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
1751                     "Please use different suffixes for the intersected columns.", sep = ""))
1752
1753  unlink(jsonPath2)
1754  unlink(jsonPath3)
1755})
1756
1757test_that("toJSON() returns an RDD of the correct values", {
1758  df <- read.json(jsonPath)
1759  testRDD <- toJSON(df)
1760  expect_is(testRDD, "RDD")
1761  expect_equal(getSerializedMode(testRDD), "string")
1762  expect_equal(collectRDD(testRDD)[[1]], mockLines[1])
1763})
1764
1765test_that("showDF()", {
1766  df <- read.json(jsonPath)
1767  expected <- paste("+----+-------+\n",
1768                    "| age|   name|\n",
1769                    "+----+-------+\n",
1770                    "|null|Michael|\n",
1771                    "|  30|   Andy|\n",
1772                    "|  19| Justin|\n",
1773                    "+----+-------+\n", sep = "")
1774  expected2 <- paste("+---+----+\n",
1775                     "|age|name|\n",
1776                     "+---+----+\n",
1777                     "|nul| Mic|\n",
1778                     "| 30| And|\n",
1779                     "| 19| Jus|\n",
1780                     "+---+----+\n", sep = "")
1781  expect_output(showDF(df), expected)
1782  expect_output(showDF(df, truncate = 3), expected2)
1783})
1784
1785test_that("isLocal()", {
1786  df <- read.json(jsonPath)
1787  expect_false(isLocal(df))
1788})
1789
1790test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
1791  df <- read.json(jsonPath)
1792
1793  lines <- c("{\"name\":\"Bob\", \"age\":24}",
1794             "{\"name\":\"Andy\", \"age\":30}",
1795             "{\"name\":\"James\", \"age\":35}")
1796  jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1797  writeLines(lines, jsonPath2)
1798  df2 <- read.df(jsonPath2, "json")
1799
1800  unioned <- arrange(union(df, df2), df$age)
1801  expect_is(unioned, "SparkDataFrame")
1802  expect_equal(count(unioned), 6)
1803  expect_equal(first(unioned)$name, "Michael")
1804  expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6)
1805
1806  unioned2 <- arrange(rbind(unioned, df, df2), df$age)
1807  expect_is(unioned2, "SparkDataFrame")
1808  expect_equal(count(unioned2), 12)
1809  expect_equal(first(unioned2)$name, "Michael")
1810
1811  excepted <- arrange(except(df, df2), desc(df$age))
1812  expect_is(unioned, "SparkDataFrame")
1813  expect_equal(count(excepted), 2)
1814  expect_equal(first(excepted)$name, "Justin")
1815
1816  intersected <- arrange(intersect(df, df2), df$age)
1817  expect_is(unioned, "SparkDataFrame")
1818  expect_equal(count(intersected), 1)
1819  expect_equal(first(intersected)$name, "Andy")
1820
1821  # Test base::union is working
1822  expect_equal(union(c(1:3), c(3:5)), c(1:5))
1823
1824  # Test base::rbind is working
1825  expect_equal(length(rbind(1:4, c = 2, a = 10, 10, deparse.level = 0)), 16)
1826
1827  # Test base::intersect is working
1828  expect_equal(length(intersect(1:20, 3:23)), 18)
1829
1830  unlink(jsonPath2)
1831})
1832
1833test_that("withColumn() and withColumnRenamed()", {
1834  df <- read.json(jsonPath)
1835  newDF <- withColumn(df, "newAge", df$age + 2)
1836  expect_equal(length(columns(newDF)), 3)
1837  expect_equal(columns(newDF)[3], "newAge")
1838  expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32)
1839
1840  # Replace existing column
1841  newDF <- withColumn(df, "age", df$age + 2)
1842  expect_equal(length(columns(newDF)), 2)
1843  expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32)
1844
1845  newDF <- withColumn(df, "age", 18)
1846  expect_equal(length(columns(newDF)), 2)
1847  expect_equal(first(newDF)$age, 18)
1848
1849  expect_error(withColumn(df, "age", list("a")),
1850              "Literal value must be atomic in length of 1")
1851
1852  newDF2 <- withColumnRenamed(df, "age", "newerAge")
1853  expect_equal(length(columns(newDF2)), 2)
1854  expect_equal(columns(newDF2)[1], "newerAge")
1855})
1856
1857test_that("mutate(), transform(), rename() and names()", {
1858  df <- read.json(jsonPath)
1859  newDF <- mutate(df, newAge = df$age + 2)
1860  expect_equal(length(columns(newDF)), 3)
1861  expect_equal(columns(newDF)[3], "newAge")
1862  expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32)
1863
1864  newDF <- mutate(df, age = df$age + 2, newAge = df$age + 3)
1865  expect_equal(length(columns(newDF)), 3)
1866  expect_equal(columns(newDF)[3], "newAge")
1867  expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 33)
1868  expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32)
1869
1870  newDF <- mutate(df, age = df$age + 2, newAge = df$age + 3,
1871                  age = df$age + 4, newAge = df$age + 5)
1872  expect_equal(length(columns(newDF)), 3)
1873  expect_equal(columns(newDF)[3], "newAge")
1874  expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 35)
1875  expect_equal(first(filter(newDF, df$name != "Michael"))$age, 34)
1876
1877  newDF <- mutate(df, df$age + 3)
1878  expect_equal(length(columns(newDF)), 3)
1879  expect_equal(columns(newDF)[[3]], "df$age + 3")
1880  expect_equal(first(filter(newDF, df$name != "Michael"))[[3]], 33)
1881
1882  newDF2 <- rename(df, newerAge = df$age)
1883  expect_equal(length(columns(newDF2)), 2)
1884  expect_equal(columns(newDF2)[1], "newerAge")
1885
1886  names(newDF2) <- c("newerName", "evenNewerAge")
1887  expect_equal(length(names(newDF2)), 2)
1888  expect_equal(names(newDF2)[1], "newerName")
1889
1890  transformedDF <- transform(df, newAge = -df$age, newAge2 = df$age / 2)
1891  expect_equal(length(columns(transformedDF)), 4)
1892  expect_equal(columns(transformedDF)[3], "newAge")
1893  expect_equal(columns(transformedDF)[4], "newAge2")
1894  expect_equal(first(filter(transformedDF, transformedDF$name == "Andy"))$newAge, -30)
1895
1896  # test if base::transform on local data frames works
1897  # ensure the proper signature is used - otherwise this will fail to run
1898  attach(airquality)
1899  result <- transform(Ozone, logOzone = log(Ozone))
1900  expect_equal(nrow(result), 153)
1901  expect_equal(ncol(result), 2)
1902  detach(airquality)
1903})
1904
1905test_that("read/write ORC files", {
1906  setHiveContext(sc)
1907  df <- read.df(jsonPath, "json")
1908
1909  # Test write.df and read.df
1910  write.df(df, orcPath, "orc", mode = "overwrite")
1911  df2 <- read.df(orcPath, "orc")
1912  expect_is(df2, "SparkDataFrame")
1913  expect_equal(count(df), count(df2))
1914
1915  # Test write.orc and read.orc
1916  orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc")
1917  write.orc(df, orcPath2)
1918  orcDF <- read.orc(orcPath2)
1919  expect_is(orcDF, "SparkDataFrame")
1920  expect_equal(count(orcDF), count(df))
1921
1922  unlink(orcPath2)
1923  unsetHiveContext()
1924})
1925
1926test_that("read/write ORC files - compression option", {
1927  setHiveContext(sc)
1928  df <- read.df(jsonPath, "json")
1929
1930  orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc")
1931  write.orc(df, orcPath2, compression = "ZLIB")
1932  orcDF <- read.orc(orcPath2)
1933  expect_is(orcDF, "SparkDataFrame")
1934  expect_equal(count(orcDF), count(df))
1935  expect_true(length(list.files(orcPath2, pattern = ".zlib.orc")) > 0)
1936
1937  unlink(orcPath2)
1938  unsetHiveContext()
1939})
1940
1941test_that("read/write Parquet files", {
1942  df <- read.df(jsonPath, "json")
1943  # Test write.df and read.df
1944  write.df(df, parquetPath, "parquet", mode = "overwrite")
1945  df2 <- read.df(parquetPath, "parquet")
1946  expect_is(df2, "SparkDataFrame")
1947  expect_equal(count(df2), 3)
1948
1949  # Test write.parquet/saveAsParquetFile and read.parquet/parquetFile
1950  parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
1951  write.parquet(df, parquetPath2)
1952  parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
1953  suppressWarnings(saveAsParquetFile(df, parquetPath3))
1954  parquetDF <- read.parquet(c(parquetPath2, parquetPath3))
1955  expect_is(parquetDF, "SparkDataFrame")
1956  expect_equal(count(parquetDF), count(df) * 2)
1957  parquetDF2 <- suppressWarnings(parquetFile(parquetPath2, parquetPath3))
1958  expect_is(parquetDF2, "SparkDataFrame")
1959  expect_equal(count(parquetDF2), count(df) * 2)
1960
1961  # Test if varargs works with variables
1962  saveMode <- "overwrite"
1963  mergeSchema <- "true"
1964  parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
1965  write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema)
1966
1967  unlink(parquetPath2)
1968  unlink(parquetPath3)
1969  unlink(parquetPath4)
1970})
1971
1972test_that("read/write Parquet files - compression option/mode", {
1973  df <- read.df(jsonPath, "json")
1974  tempPath <- tempfile(pattern = "tempPath", fileext = ".parquet")
1975
1976  # Test write.df and read.df
1977  write.parquet(df, tempPath, compression = "GZIP")
1978  df2 <- read.parquet(tempPath)
1979  expect_is(df2, "SparkDataFrame")
1980  expect_equal(count(df2), 3)
1981  expect_true(length(list.files(tempPath, pattern = ".gz.parquet")) > 0)
1982
1983  write.parquet(df, tempPath, mode = "overwrite")
1984  df3 <- read.parquet(tempPath)
1985  expect_is(df3, "SparkDataFrame")
1986  expect_equal(count(df3), 3)
1987})
1988
1989test_that("read/write text files", {
1990  # Test write.df and read.df
1991  df <- read.df(jsonPath, "text")
1992  expect_is(df, "SparkDataFrame")
1993  expect_equal(colnames(df), c("value"))
1994  expect_equal(count(df), 3)
1995  textPath <- tempfile(pattern = "textPath", fileext = ".txt")
1996  write.df(df, textPath, "text", mode = "overwrite")
1997
1998  # Test write.text and read.text
1999  textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt")
2000  write.text(df, textPath2)
2001  df2 <- read.text(c(textPath, textPath2))
2002  expect_is(df2, "SparkDataFrame")
2003  expect_equal(colnames(df2), c("value"))
2004  expect_equal(count(df2), count(df) * 2)
2005
2006  unlink(textPath)
2007  unlink(textPath2)
2008})
2009
2010test_that("read/write text files - compression option", {
2011  df <- read.df(jsonPath, "text")
2012
2013  textPath <- tempfile(pattern = "textPath", fileext = ".txt")
2014  write.text(df, textPath, compression = "GZIP")
2015  textDF <- read.text(textPath)
2016  expect_is(textDF, "SparkDataFrame")
2017  expect_equal(count(textDF), count(df))
2018  expect_true(length(list.files(textPath, pattern = ".gz")) > 0)
2019
2020  unlink(textPath)
2021})
2022
2023test_that("describe() and summarize() on a DataFrame", {
2024  df <- read.json(jsonPath)
2025  stats <- describe(df, "age")
2026  expect_equal(collect(stats)[1, "summary"], "count")
2027  expect_equal(collect(stats)[2, "age"], "24.5")
2028  expect_equal(collect(stats)[3, "age"], "7.7781745930520225")
2029  stats <- describe(df)
2030  expect_equal(collect(stats)[4, "summary"], "min")
2031  expect_equal(collect(stats)[5, "age"], "30")
2032
2033  stats2 <- summary(df)
2034  expect_equal(collect(stats2)[4, "summary"], "min")
2035  expect_equal(collect(stats2)[5, "age"], "30")
2036
2037  # SPARK-16425: SparkR summary() fails on column of type logical
2038  df <- withColumn(df, "boolean", df$age == 30)
2039  summary(df)
2040
2041  # Test base::summary is working
2042  expect_equal(length(summary(attenu, digits = 4)), 35)
2043})
2044
2045test_that("dropna() and na.omit() on a DataFrame", {
2046  df <- read.json(jsonPathNa)
2047  rows <- collect(df)
2048
2049  # drop with columns
2050
2051  expected <- rows[!is.na(rows$name), ]
2052  actual <- collect(dropna(df, cols = "name"))
2053  expect_identical(expected, actual)
2054  actual <- collect(na.omit(df, cols = "name"))
2055  expect_identical(expected, actual)
2056
2057  expected <- rows[!is.na(rows$age), ]
2058  actual <- collect(dropna(df, cols = "age"))
2059  row.names(expected) <- row.names(actual)
2060  # identical on two dataframes does not work here. Don't know why.
2061  # use identical on all columns as a workaround.
2062  expect_identical(expected$age, actual$age)
2063  expect_identical(expected$height, actual$height)
2064  expect_identical(expected$name, actual$name)
2065  actual <- collect(na.omit(df, cols = "age"))
2066
2067  expected <- rows[!is.na(rows$age) & !is.na(rows$height), ]
2068  actual <- collect(dropna(df, cols = c("age", "height")))
2069  expect_identical(expected, actual)
2070  actual <- collect(na.omit(df, cols = c("age", "height")))
2071  expect_identical(expected, actual)
2072
2073  expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name), ]
2074  actual <- collect(dropna(df))
2075  expect_identical(expected, actual)
2076  actual <- collect(na.omit(df))
2077  expect_identical(expected, actual)
2078
2079  # drop with how
2080
2081  expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name), ]
2082  actual <- collect(dropna(df))
2083  expect_identical(expected, actual)
2084  actual <- collect(na.omit(df))
2085  expect_identical(expected, actual)
2086
2087  expected <- rows[!is.na(rows$age) | !is.na(rows$height) | !is.na(rows$name), ]
2088  actual <- collect(dropna(df, "all"))
2089  expect_identical(expected, actual)
2090  actual <- collect(na.omit(df, "all"))
2091  expect_identical(expected, actual)
2092
2093  expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name), ]
2094  actual <- collect(dropna(df, "any"))
2095  expect_identical(expected, actual)
2096  actual <- collect(na.omit(df, "any"))
2097  expect_identical(expected, actual)
2098
2099  expected <- rows[!is.na(rows$age) & !is.na(rows$height), ]
2100  actual <- collect(dropna(df, "any", cols = c("age", "height")))
2101  expect_identical(expected, actual)
2102  actual <- collect(na.omit(df, "any", cols = c("age", "height")))
2103  expect_identical(expected, actual)
2104
2105  expected <- rows[!is.na(rows$age) | !is.na(rows$height), ]
2106  actual <- collect(dropna(df, "all", cols = c("age", "height")))
2107  expect_identical(expected, actual)
2108  actual <- collect(na.omit(df, "all", cols = c("age", "height")))
2109  expect_identical(expected, actual)
2110
2111  # drop with threshold
2112
2113  expected <- rows[as.integer(!is.na(rows$age)) + as.integer(!is.na(rows$height)) >= 2, ]
2114  actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height")))
2115  expect_identical(expected, actual)
2116  actual <- collect(na.omit(df, minNonNulls = 2, cols = c("age", "height")))
2117  expect_identical(expected, actual)
2118
2119  expected <- rows[as.integer(!is.na(rows$age)) +
2120                   as.integer(!is.na(rows$height)) +
2121                   as.integer(!is.na(rows$name)) >= 3, ]
2122  actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height")))
2123  expect_identical(expected, actual)
2124  actual <- collect(na.omit(df, minNonNulls = 3, cols = c("name", "age", "height")))
2125  expect_identical(expected, actual)
2126
2127  # Test stats::na.omit is working
2128  expect_equal(nrow(na.omit(data.frame(x = c(0, 10, NA)))), 2)
2129})
2130
2131test_that("fillna() on a DataFrame", {
2132  df <- read.json(jsonPathNa)
2133  rows <- collect(df)
2134
2135  # fill with value
2136
2137  expected <- rows
2138  expected$age[is.na(expected$age)] <- 50
2139  expected$height[is.na(expected$height)] <- 50.6
2140  actual <- collect(fillna(df, 50.6))
2141  expect_identical(expected, actual)
2142
2143  expected <- rows
2144  expected$name[is.na(expected$name)] <- "unknown"
2145  actual <- collect(fillna(df, "unknown"))
2146  expect_identical(expected, actual)
2147
2148  expected <- rows
2149  expected$age[is.na(expected$age)] <- 50
2150  actual <- collect(fillna(df, 50.6, "age"))
2151  expect_identical(expected, actual)
2152
2153  expected <- rows
2154  expected$name[is.na(expected$name)] <- "unknown"
2155  actual <- collect(fillna(df, "unknown", c("age", "name")))
2156  expect_identical(expected, actual)
2157
2158  # fill with named list
2159
2160  expected <- rows
2161  expected$age[is.na(expected$age)] <- 50
2162  expected$height[is.na(expected$height)] <- 50.6
2163  expected$name[is.na(expected$name)] <- "unknown"
2164  actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown")))
2165  expect_identical(expected, actual)
2166})
2167
2168test_that("crosstab() on a DataFrame", {
2169  rdd <- lapply(parallelize(sc, 0:3), function(x) {
2170    list(paste0("a", x %% 3), paste0("b", x %% 2))
2171  })
2172  df <- toDF(rdd, list("a", "b"))
2173  ct <- crosstab(df, "a", "b")
2174  ordered <- ct[order(ct$a_b), ]
2175  row.names(ordered) <- NULL
2176  expected <- data.frame("a_b" = c("a0", "a1", "a2"), "b0" = c(1, 0, 1), "b1" = c(1, 1, 0),
2177                         stringsAsFactors = FALSE, row.names = NULL)
2178  expect_identical(expected, ordered)
2179})
2180
2181test_that("cov() and corr() on a DataFrame", {
2182  l <- lapply(c(0:9), function(x) { list(x, x * 2.0) })
2183  df <- createDataFrame(l, c("singles", "doubles"))
2184  result <- cov(df, "singles", "doubles")
2185  expect_true(abs(result - 55.0 / 3) < 1e-12)
2186
2187  result <- corr(df, "singles", "doubles")
2188  expect_true(abs(result - 1.0) < 1e-12)
2189  result <- corr(df, "singles", "doubles", "pearson")
2190  expect_true(abs(result - 1.0) < 1e-12)
2191
2192  # Test stats::cov is working
2193  #expect_true(abs(max(cov(swiss)) - 1739.295) < 1e-3) # nolint
2194})
2195
2196test_that("freqItems() on a DataFrame", {
2197  input <- 1:1000
2198  rdf <- data.frame(numbers = input, letters = as.character(input),
2199                    negDoubles = input * -1.0, stringsAsFactors = F)
2200  rdf[ input %% 3 == 0, ] <- c(1, "1", -1)
2201  df <- createDataFrame(rdf)
2202  multiColResults <- freqItems(df, c("numbers", "letters"), support = 0.1)
2203  expect_true(1 %in% multiColResults$numbers[[1]])
2204  expect_true("1" %in% multiColResults$letters[[1]])
2205  singleColResult <- freqItems(df, "negDoubles", support = 0.1)
2206  expect_true(-1 %in% head(singleColResult$negDoubles)[[1]])
2207
2208  l <- lapply(c(0:99), function(i) {
2209    if (i %% 2 == 0) { list(1L, -1.0) }
2210    else { list(i, i * -1.0) }})
2211  df <- createDataFrame(l, c("a", "b"))
2212  result <- freqItems(df, c("a", "b"), 0.4)
2213  expect_identical(result[[1]], list(list(1L, 99L)))
2214  expect_identical(result[[2]], list(list(-1, -99)))
2215})
2216
2217test_that("sampleBy() on a DataFrame", {
2218  l <- lapply(c(0:99), function(i) { as.character(i %% 3) })
2219  df <- createDataFrame(l, "key")
2220  fractions <- list("0" = 0.1, "1" = 0.2)
2221  sample <- sampleBy(df, "key", fractions, 0)
2222  result <- collect(orderBy(count(groupBy(sample, "key")), "key"))
2223  expect_identical(as.list(result[1, ]), list(key = "0", count = 3))
2224  expect_identical(as.list(result[2, ]), list(key = "1", count = 7))
2225})
2226
2227test_that("approxQuantile() on a DataFrame", {
2228  l <- lapply(c(0:99), function(i) { i })
2229  df <- createDataFrame(l, "key")
2230  quantiles <- approxQuantile(df, "key", c(0.5, 0.8), 0.0)
2231  expect_equal(quantiles[[1]], 50)
2232  expect_equal(quantiles[[2]], 80)
2233})
2234
2235test_that("SQL error message is returned from JVM", {
2236  retError <- tryCatch(sql("select * from blah"), error = function(e) e)
2237  expect_equal(grepl("Table or view not found", retError), TRUE)
2238  expect_equal(grepl("blah", retError), TRUE)
2239})
2240
2241irisDF <- suppressWarnings(createDataFrame(iris))
2242
2243test_that("Method as.data.frame as a synonym for collect()", {
2244  expect_equal(as.data.frame(irisDF), collect(irisDF))
2245  irisDF2 <- irisDF[irisDF$Species == "setosa", ]
2246  expect_equal(as.data.frame(irisDF2), collect(irisDF2))
2247
2248  # Make sure as.data.frame in the R base package is not covered
2249  expect_error(as.data.frame(c(1, 2)), NA)
2250})
2251
2252test_that("attach() on a DataFrame", {
2253  df <- read.json(jsonPath)
2254  expect_error(age)
2255  attach(df)
2256  expect_is(age, "SparkDataFrame")
2257  expected_age <- data.frame(age = c(NA, 30, 19))
2258  expect_equal(head(age), expected_age)
2259  stat <- summary(age)
2260  expect_equal(collect(stat)[5, "age"], "30")
2261  age <- age$age + 1
2262  expect_is(age, "Column")
2263  rm(age)
2264  stat2 <- summary(age)
2265  expect_equal(collect(stat2)[5, "age"], "30")
2266  detach("df")
2267  stat3 <- summary(df[, "age", drop = F])
2268  expect_equal(collect(stat3)[5, "age"], "30")
2269  expect_error(age)
2270})
2271
2272test_that("with() on a DataFrame", {
2273  df <- suppressWarnings(createDataFrame(iris))
2274  expect_error(Sepal_Length)
2275  sum1 <- with(df, list(summary(Sepal_Length), summary(Sepal_Width)))
2276  expect_equal(collect(sum1[[1]])[1, "Sepal_Length"], "150")
2277  sum2 <- with(df, distinct(Sepal_Length))
2278  expect_equal(nrow(sum2), 35)
2279})
2280
2281test_that("Method coltypes() to get and set R's data types of a DataFrame", {
2282  expect_equal(coltypes(irisDF), c(rep("numeric", 4), "character"))
2283
2284  data <- data.frame(c1 = c(1, 2, 3),
2285                     c2 = c(T, F, T),
2286                     c3 = c("2015/01/01 10:00:00", "2015/01/02 10:00:00", "2015/01/03 10:00:00"))
2287
2288  schema <- structType(structField("c1", "byte"),
2289                       structField("c3", "boolean"),
2290                       structField("c4", "timestamp"))
2291
2292  # Test primitive types
2293  DF <- createDataFrame(data, schema)
2294  expect_equal(coltypes(DF), c("integer", "logical", "POSIXct"))
2295  createOrReplaceTempView(DF, "DFView")
2296  sqlCast <- sql("select cast('2' as decimal) as x from DFView limit 1")
2297  expect_equal(coltypes(sqlCast), "numeric")
2298
2299  # Test complex types
2300  x <- createDataFrame(list(list(as.environment(
2301    list("a" = "b", "c" = "d", "e" = "f")))))
2302  expect_equal(coltypes(x), "map<string,string>")
2303
2304  df <- selectExpr(read.json(jsonPath), "name", "(age * 1.21) as age")
2305  expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)")))
2306
2307  df1 <- select(df, cast(df$age, "integer"))
2308  coltypes(df) <- c("character", "integer")
2309  expect_equal(dtypes(df), list(c("name", "string"), c("age", "int")))
2310  value <- collect(df[, 2, drop = F])[[3, 1]]
2311  expect_equal(value, collect(df1)[[3, 1]])
2312  expect_equal(value, 22)
2313
2314  coltypes(df) <- c(NA, "numeric")
2315  expect_equal(dtypes(df), list(c("name", "string"), c("age", "double")))
2316
2317  expect_error(coltypes(df) <- c("character"),
2318               "Length of type vector should match the number of columns for SparkDataFrame")
2319  expect_error(coltypes(df) <- c("environment", "list"),
2320               "Only atomic type is supported for column types")
2321})
2322
2323test_that("Method str()", {
2324  # Structure of Iris
2325  iris2 <- iris
2326  colnames(iris2) <- c("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species")
2327  iris2$col <- TRUE
2328  irisDF2 <- createDataFrame(iris2)
2329
2330  out <- capture.output(str(irisDF2))
2331  expect_equal(length(out), 7)
2332  expect_equal(out[1], "'SparkDataFrame': 6 variables:")
2333  expect_equal(out[2], " $ Sepal_Length: num 5.1 4.9 4.7 4.6 5 5.4")
2334  expect_equal(out[3], " $ Sepal_Width : num 3.5 3 3.2 3.1 3.6 3.9")
2335  expect_equal(out[4], " $ Petal_Length: num 1.4 1.4 1.3 1.5 1.4 1.7")
2336  expect_equal(out[5], " $ Petal_Width : num 0.2 0.2 0.2 0.2 0.2 0.4")
2337  expect_equal(out[6], paste0(" $ Species     : chr \"setosa\" \"setosa\" \"",
2338                              "setosa\" \"setosa\" \"setosa\" \"setosa\""))
2339  expect_equal(out[7], " $ col         : logi TRUE TRUE TRUE TRUE TRUE TRUE")
2340
2341  createOrReplaceTempView(irisDF2, "irisView")
2342
2343  sqlCast <- sql("select cast('2' as decimal) as x from irisView limit 1")
2344  castStr <- capture.output(str(sqlCast))
2345  expect_equal(length(castStr), 2)
2346  expect_equal(castStr[1], "'SparkDataFrame': 1 variables:")
2347  expect_equal(castStr[2], " $ x: num 2")
2348
2349  # A random dataset with many columns. This test is to check str limits
2350  # the number of columns. Therefore, it will suffice to check for the
2351  # number of returned rows
2352  x <- runif(200, 1, 10)
2353  df <- data.frame(t(as.matrix(data.frame(x, x, x, x, x, x, x, x, x))))
2354  DF <- createDataFrame(df)
2355  out <- capture.output(str(DF))
2356  expect_equal(length(out), 103)
2357
2358  # Test utils:::str
2359  expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris)))
2360})
2361
2362test_that("Histogram", {
2363
2364  # Basic histogram test with colname
2365  expect_equal(
2366    all(histogram(irisDF, "Petal_Width", 8) ==
2367        data.frame(bins = seq(0, 7),
2368                   counts = c(48, 2, 7, 21, 24, 19, 15, 14),
2369                   centroids = seq(0, 7) * 0.3 + 0.25)),
2370        TRUE)
2371
2372  # Basic histogram test with Column
2373  expect_equal(
2374    all(histogram(irisDF, irisDF$Petal_Width, 8) ==
2375          data.frame(bins = seq(0, 7),
2376                     counts = c(48, 2, 7, 21, 24, 19, 15, 14),
2377                     centroids = seq(0, 7) * 0.3 + 0.25)),
2378    TRUE)
2379
2380  # Basic histogram test with derived column
2381  expect_equal(
2382    all(round(histogram(irisDF, irisDF$Petal_Width + 1, 8), 2) ==
2383          data.frame(bins = seq(0, 7),
2384                     counts = c(48, 2, 7, 21, 24, 19, 15, 14),
2385                     centroids = seq(0, 7) * 0.3 + 1.25)),
2386    TRUE)
2387
2388  # Missing nbins
2389  expect_equal(length(histogram(irisDF, "Petal_Width")$counts), 10)
2390
2391  # Wrong colname
2392  expect_error(histogram(irisDF, "xxx"),
2393               "Specified colname does not belong to the given SparkDataFrame.")
2394
2395  # Invalid nbins
2396  expect_error(histogram(irisDF, "Petal_Width", nbins = 0),
2397               "The number of bins must be a positive integer number greater than 1.")
2398
2399  # Test against R's hist
2400  expect_equal(all(hist(iris$Sepal.Width)$counts ==
2401                   histogram(irisDF, "Sepal_Width", 12)$counts), T)
2402
2403  # Test when there are zero counts
2404  df <- as.DataFrame(data.frame(x = c(1, 2, 3, 4, 100)))
2405  expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1))
2406})
2407
2408test_that("dapply() and dapplyCollect() on a DataFrame", {
2409  df <- createDataFrame(
2410          list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),
2411          c("a", "b", "c"))
2412  ldf <- collect(df)
2413  df1 <- dapply(df, function(x) { x }, schema(df))
2414  result <- collect(df1)
2415  expect_identical(ldf, result)
2416
2417  result <- dapplyCollect(df, function(x) { x })
2418  expect_identical(ldf, result)
2419
2420  # Filter and add a column
2421  schema <- structType(structField("a", "integer"), structField("b", "double"),
2422                       structField("c", "string"), structField("d", "integer"))
2423  df1 <- dapply(
2424           df,
2425           function(x) {
2426             y <- x[x$a > 1, ]
2427             y <- cbind(y, y$a + 1L)
2428           },
2429           schema)
2430  result <- collect(df1)
2431  expected <- ldf[ldf$a > 1, ]
2432  expected$d <- expected$a + 1L
2433  rownames(expected) <- NULL
2434  expect_identical(expected, result)
2435
2436  result <- dapplyCollect(
2437              df,
2438              function(x) {
2439                y <- x[x$a > 1, ]
2440                y <- cbind(y, y$a + 1L)
2441              })
2442  expected1 <- expected
2443  names(expected1) <- names(result)
2444  expect_identical(expected1, result)
2445
2446  # Remove the added column
2447  df2 <- dapply(
2448           df1,
2449           function(x) {
2450             x[, c("a", "b", "c")]
2451           },
2452           schema(df))
2453  result <- collect(df2)
2454  expected <- expected[, c("a", "b", "c")]
2455  expect_identical(expected, result)
2456
2457  result <- dapplyCollect(
2458              df1,
2459              function(x) {
2460               x[, c("a", "b", "c")]
2461              })
2462  expect_identical(expected, result)
2463})
2464
2465test_that("dapplyCollect() on DataFrame with a binary column", {
2466
2467  df <- data.frame(key = 1:3)
2468  df$bytes <- lapply(df$key, serialize, connection = NULL)
2469
2470  df_spark <- createDataFrame(df)
2471
2472  result1 <- collect(df_spark)
2473  expect_identical(df, result1)
2474
2475  result2 <- dapplyCollect(df_spark, function(x) x)
2476  expect_identical(df, result2)
2477
2478  # A data.frame with a single column of bytes
2479  scb <- subset(df, select = "bytes")
2480  scb_spark <- createDataFrame(scb)
2481  result <- dapplyCollect(scb_spark, function(x) x)
2482  expect_identical(scb, result)
2483
2484})
2485
2486test_that("repartition by columns on DataFrame", {
2487  df <- createDataFrame(
2488    list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
2489    c("a", "b", "c", "d"))
2490
2491  # no column and number of partitions specified
2492  retError <- tryCatch(repartition(df), error = function(e) e)
2493  expect_equal(grepl
2494    ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
2495
2496  # repartition by column and number of partitions
2497  actual <- repartition(df, 3, col = df$"a")
2498
2499  # Checking that at least the dimensions are identical
2500  expect_identical(dim(df), dim(actual))
2501  expect_equal(getNumPartitions(actual), 3L)
2502
2503  # repartition by number of partitions
2504  actual <- repartition(df, 13L)
2505  expect_identical(dim(df), dim(actual))
2506  expect_equal(getNumPartitions(actual), 13L)
2507
2508  expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
2509
2510  # a test case with a column and dapply
2511  schema <-  structType(structField("a", "integer"), structField("avg", "double"))
2512  df <- repartition(df, col = df$"a")
2513  df1 <- dapply(
2514    df,
2515    function(x) {
2516      y <- (data.frame(x$a[1], mean(x$b)))
2517    },
2518    schema)
2519
2520  # Number of partitions is equal to 2
2521  expect_equal(nrow(df1), 2)
2522})
2523
2524test_that("coalesce, repartition, numPartitions", {
2525  df <- as.DataFrame(cars, numPartitions = 5)
2526  expect_equal(getNumPartitions(df), 5)
2527  expect_equal(getNumPartitions(coalesce(df, 3)), 3)
2528  expect_equal(getNumPartitions(coalesce(df, 6)), 5)
2529
2530  df1 <- coalesce(df, 3)
2531  expect_equal(getNumPartitions(df1), 3)
2532  expect_equal(getNumPartitions(coalesce(df1, 6)), 5)
2533  expect_equal(getNumPartitions(coalesce(df1, 4)), 4)
2534  expect_equal(getNumPartitions(coalesce(df1, 2)), 2)
2535
2536  df2 <- repartition(df1, 10)
2537  expect_equal(getNumPartitions(df2), 10)
2538  expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
2539  expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
2540  expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
2541})
2542
2543test_that("gapply() and gapplyCollect() on a DataFrame", {
2544  df <- createDataFrame (
2545    list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
2546    c("a", "b", "c", "d"))
2547  expected <- collect(df)
2548  df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
2549  actual <- collect(df1)
2550  expect_identical(actual, expected)
2551
2552  df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
2553  expect_identical(df1Collect, expected)
2554
2555  # Computes the sum of second column by grouping on the first and third columns
2556  # and checks if the sum is larger than 2
2557  schema <- structType(structField("a", "integer"), structField("e", "boolean"))
2558  df2 <- gapply(
2559    df,
2560    c(df$"a", df$"c"),
2561    function(key, x) {
2562      y <- data.frame(key[1], sum(x$b) > 2)
2563    },
2564    schema)
2565  actual <- collect(df2)$e
2566  expected <- c(TRUE, TRUE)
2567  expect_identical(actual, expected)
2568
2569  df2Collect <- gapplyCollect(
2570    df,
2571    c(df$"a", df$"c"),
2572    function(key, x) {
2573      y <- data.frame(key[1], sum(x$b) > 2)
2574      colnames(y) <- c("a", "e")
2575      y
2576    })
2577    actual <- df2Collect$e
2578    expect_identical(actual, expected)
2579
2580  # Computes the arithmetic mean of the second column by grouping
2581  # on the first and third columns. Output the groupping value and the average.
2582  schema <-  structType(structField("a", "integer"), structField("c", "string"),
2583               structField("avg", "double"))
2584  df3 <- gapply(
2585    df,
2586    c("a", "c"),
2587    function(key, x) {
2588      y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
2589    },
2590    schema)
2591  actual <- collect(df3)
2592  actual <-  actual[order(actual$a), ]
2593  rownames(actual) <- NULL
2594  expected <- collect(select(df, "a", "b", "c"))
2595  expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
2596  colnames(expected) <- c("a", "c", "avg")
2597  expected <-  expected[order(expected$a), ]
2598  rownames(expected) <- NULL
2599  expect_identical(actual, expected)
2600
2601  df3Collect <- gapplyCollect(
2602    df,
2603    c("a", "c"),
2604    function(key, x) {
2605      y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
2606      colnames(y) <- c("a", "c", "avg")
2607      y
2608    })
2609  actual <- df3Collect[order(df3Collect$a), ]
2610  expect_identical(actual$avg, expected$avg)
2611
2612  irisDF <- suppressWarnings(createDataFrame (iris))
2613  schema <-  structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
2614  # Groups by `Sepal_Length` and computes the average for `Sepal_Width`
2615  df4 <- gapply(
2616    cols = "Sepal_Length",
2617    irisDF,
2618    function(key, x) {
2619      y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
2620    },
2621    schema)
2622  actual <- collect(df4)
2623  actual <- actual[order(actual$Sepal_Length), ]
2624  rownames(actual) <- NULL
2625  agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean),
2626                    stringsAsFactors = FALSE)
2627  colnames(agg_local_df) <- c("Sepal_Length", "Avg")
2628  expected <-  agg_local_df[order(agg_local_df$Sepal_Length), ]
2629  rownames(expected) <- NULL
2630  expect_identical(actual, expected)
2631})
2632
2633test_that("Window functions on a DataFrame", {
2634  df <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
2635                        schema = c("key", "value"))
2636  ws <- orderBy(windowPartitionBy("key"), "value")
2637  result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
2638  names(result) <- c("key", "value")
2639  expected <- data.frame(key = c(1L, NA, 2L, NA),
2640                       value = c("1", NA, "2", NA),
2641                       stringsAsFactors = FALSE)
2642  expect_equal(result, expected)
2643
2644  ws <- orderBy(windowPartitionBy(df$key), df$value)
2645  result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
2646  names(result) <- c("key", "value")
2647  expect_equal(result, expected)
2648
2649  ws <- partitionBy(windowOrderBy("value"), "key")
2650  result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
2651  names(result) <- c("key", "value")
2652  expect_equal(result, expected)
2653
2654  ws <- partitionBy(windowOrderBy(df$value), df$key)
2655  result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
2656  names(result) <- c("key", "value")
2657  expect_equal(result, expected)
2658})
2659
2660test_that("createDataFrame sqlContext parameter backward compatibility", {
2661  sqlContext <- suppressWarnings(sparkRSQL.init(sc))
2662  a <- 1:3
2663  b <- c("a", "b", "c")
2664  ldf <- data.frame(a, b)
2665  # Call function with namespace :: operator - SPARK-16538
2666  df <- suppressWarnings(SparkR::createDataFrame(sqlContext, ldf))
2667  expect_equal(columns(df), c("a", "b"))
2668  expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
2669  expect_equal(count(df), 3)
2670  ldf2 <- collect(df)
2671  expect_equal(ldf$a, ldf2$a)
2672
2673  df2 <- suppressWarnings(createDataFrame(sqlContext, iris))
2674  expect_equal(count(df2), 150)
2675  expect_equal(ncol(df2), 5)
2676
2677  df3 <- suppressWarnings(read.df(sqlContext, jsonPath, "json"))
2678  expect_is(df3, "SparkDataFrame")
2679  expect_equal(count(df3), 3)
2680
2681  before <- suppressWarnings(createDataFrame(sqlContext, iris))
2682  after <- suppressWarnings(createDataFrame(iris))
2683  expect_equal(collect(before), collect(after))
2684
2685  # more tests for SPARK-16538
2686  createOrReplaceTempView(df, "table")
2687  SparkR::tables()
2688  SparkR::sql("SELECT 1")
2689  suppressWarnings(SparkR::sql(sqlContext, "SELECT * FROM table"))
2690  suppressWarnings(SparkR::dropTempTable(sqlContext, "table"))
2691})
2692
2693test_that("randomSplit", {
2694  num <- 4000
2695  df <- createDataFrame(data.frame(id = 1:num))
2696  weights <- c(2, 3, 5)
2697  df_list <- randomSplit(df, weights)
2698  expect_equal(length(weights), length(df_list))
2699  counts <- sapply(df_list, count)
2700  expect_equal(num, sum(counts))
2701  expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 })))
2702
2703  df_list <- randomSplit(df, weights, 0)
2704  expect_equal(length(weights), length(df_list))
2705  counts <- sapply(df_list, count)
2706  expect_equal(num, sum(counts))
2707  expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 })))
2708})
2709
2710test_that("Setting and getting config on SparkSession, sparkR.conf(), sparkR.uiWebUrl()", {
2711  # first, set it to a random but known value
2712  conf <- callJMethod(sparkSession, "conf")
2713  property <- paste0("spark.testing.", as.character(runif(1)))
2714  value1 <- as.character(runif(1))
2715  callJMethod(conf, "set", property, value1)
2716
2717  # next, change the same property to the new value
2718  value2 <- as.character(runif(1))
2719  l <- list(value2)
2720  names(l) <- property
2721  sparkR.session(sparkConfig = l)
2722
2723  newValue <- unlist(sparkR.conf(property, ""), use.names = FALSE)
2724  expect_equal(value2, newValue)
2725
2726  value <- as.character(runif(1))
2727  sparkR.session(spark.app.name = "sparkSession test", spark.testing.r.session.r = value)
2728  allconf <- sparkR.conf()
2729  appNameValue <- allconf[["spark.app.name"]]
2730  testValue <- allconf[["spark.testing.r.session.r"]]
2731  expect_equal(appNameValue, "sparkSession test")
2732  expect_equal(testValue, value)
2733  expect_error(sparkR.conf("completely.dummy"), "Config 'completely.dummy' is not set")
2734
2735  url <- sparkR.uiWebUrl()
2736  expect_equal(substr(url, 1, 7), "http://")
2737})
2738
2739test_that("enableHiveSupport on SparkSession", {
2740  setHiveContext(sc)
2741  unsetHiveContext()
2742  # if we are still here, it must be built with hive
2743  conf <- callJMethod(sparkSession, "conf")
2744  value <- callJMethod(conf, "get", "spark.sql.catalogImplementation")
2745  expect_equal(value, "hive")
2746})
2747
2748test_that("Spark version from SparkSession", {
2749  ver <- callJMethod(sc, "version")
2750  version <- sparkR.version()
2751  expect_equal(ver, version)
2752})
2753
2754test_that("Call DataFrameWriter.save() API in Java without path and check argument types", {
2755  df <- read.df(jsonPath, "json")
2756  # This tests if the exception is thrown from JVM not from SparkR side.
2757  # It makes sure that we can omit path argument in write.df API and then it calls
2758  # DataFrameWriter.save() without path.
2759  expect_error(write.df(df, source = "csv"),
2760              "Error in save : illegal argument - Expected exactly one path to be specified")
2761  expect_error(write.json(df, jsonPath),
2762              "Error in json : analysis error - path file:.*already exists")
2763  expect_error(write.text(df, jsonPath),
2764              "Error in text : analysis error - path file:.*already exists")
2765  expect_error(write.orc(df, jsonPath),
2766              "Error in orc : analysis error - path file:.*already exists")
2767  expect_error(write.parquet(df, jsonPath),
2768              "Error in parquet : analysis error - path file:.*already exists")
2769
2770  # Arguments checking in R side.
2771  expect_error(write.df(df, "data.tmp", source = c(1, 2)),
2772               paste("source should be character, NULL or omitted. It is the datasource specified",
2773                     "in 'spark.sql.sources.default' configuration by default."))
2774  expect_error(write.df(df, path = c(3)),
2775               "path should be charactor, NULL or omitted.")
2776  expect_error(write.df(df, mode = TRUE),
2777               "mode should be charactor or omitted. It is 'error' by default.")
2778})
2779
2780test_that("Call DataFrameWriter.load() API in Java without path and check argument types", {
2781  # This tests if the exception is thrown from JVM not from SparkR side.
2782  # It makes sure that we can omit path argument in read.df API and then it calls
2783  # DataFrameWriter.load() without path.
2784  expect_error(read.df(source = "json"),
2785               paste("Error in loadDF : analysis error - Unable to infer schema for JSON.",
2786                     "It must be specified manually"))
2787  expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not exist")
2788  expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist")
2789  expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist")
2790  expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist")
2791  expect_error(read.parquet("arbitrary_path"),
2792              "Error in parquet : analysis error - Path does not exist")
2793
2794  # Arguments checking in R side.
2795  expect_error(read.df(path = c(3)),
2796               "path should be charactor, NULL or omitted.")
2797  expect_error(read.df(jsonPath, source = c(1, 2)),
2798               paste("source should be character, NULL or omitted. It is the datasource specified",
2799                     "in 'spark.sql.sources.default' configuration by default."))
2800
2801  expect_warning(read.json(jsonPath, a = 1, 2, 3, "a"),
2802                 "Unnamed arguments ignored: 2, 3, a.")
2803})
2804
2805test_that("Collect on DataFrame when NAs exists at the top of a timestamp column", {
2806  ldf <- data.frame(col1 = c(0, 1, 2),
2807                   col2 = c(as.POSIXct("2017-01-01 00:00:01"),
2808                            NA,
2809                            as.POSIXct("2017-01-01 12:00:01")),
2810                   col3 = c(as.POSIXlt("2016-01-01 00:59:59"),
2811                            NA,
2812                            as.POSIXlt("2016-01-01 12:01:01")))
2813  sdf1 <- createDataFrame(ldf)
2814  ldf1 <- collect(sdf1)
2815  expect_equal(dtypes(sdf1), list(c("col1", "double"),
2816                                  c("col2", "timestamp"),
2817                                  c("col3", "timestamp")))
2818  expect_equal(class(ldf1$col1), "numeric")
2819  expect_equal(class(ldf1$col2), c("POSIXct", "POSIXt"))
2820  expect_equal(class(ldf1$col3), c("POSIXct", "POSIXt"))
2821
2822  # Columns with NAs at the top
2823  sdf2 <- filter(sdf1, "col1 > 1")
2824  ldf2 <- collect(sdf2)
2825  expect_equal(dtypes(sdf2), list(c("col1", "double"),
2826                                  c("col2", "timestamp"),
2827                                  c("col3", "timestamp")))
2828  expect_equal(class(ldf2$col1), "numeric")
2829  expect_equal(class(ldf2$col2), c("POSIXct", "POSIXt"))
2830  expect_equal(class(ldf2$col3), c("POSIXct", "POSIXt"))
2831
2832  # Columns with only NAs, the type will also be cast to PRIMITIVE_TYPE
2833  sdf3 <- filter(sdf1, "col1 == 0")
2834  ldf3 <- collect(sdf3)
2835  expect_equal(dtypes(sdf3), list(c("col1", "double"),
2836                                  c("col2", "timestamp"),
2837                                  c("col3", "timestamp")))
2838  expect_equal(class(ldf3$col1), "numeric")
2839  expect_equal(class(ldf3$col2), c("POSIXct", "POSIXt"))
2840  expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt"))
2841})
2842
2843compare_list <- function(list1, list2) {
2844  # get testthat to show the diff by first making the 2 lists equal in length
2845  expect_equal(length(list1), length(list2))
2846  l <- max(length(list1), length(list2))
2847  length(list1) <- l
2848  length(list2) <- l
2849  expect_equal(sort(list1, na.last = TRUE), sort(list2, na.last = TRUE))
2850}
2851
2852# This should always be the **very last test** in this test file.
2853test_that("No extra files are created in SPARK_HOME by starting session and making calls", {
2854  skip_on_cran()
2855
2856  # Check that it is not creating any extra file.
2857  # Does not check the tempdir which would be cleaned up after.
2858  filesAfter <- list.files(path = sparkRDir, all.files = TRUE)
2859
2860  expect_true(length(sparkRFilesBefore) > 0)
2861  # first, ensure derby.log is not there
2862  expect_false("derby.log" %in% filesAfter)
2863  # second, ensure only spark-warehouse is created when calling SparkSession, enableHiveSupport = F
2864  # note: currently all other test files have enableHiveSupport = F, so we capture the list of files
2865  # before creating a SparkSession with enableHiveSupport = T at the top of this test file
2866  # (filesBefore). The test here is to compare that (filesBefore) against the list of files before
2867  # any test is run in run-all.R (sparkRFilesBefore).
2868  # sparkRWhitelistSQLDirs is also defined in run-all.R, and should contain only 2 whitelisted dirs,
2869  # here allow the first value, spark-warehouse, in the diff, everything else should be exactly the
2870  # same as before any test is run.
2871  compare_list(sparkRFilesBefore, setdiff(filesBefore, sparkRWhitelistSQLDirs[[1]]))
2872  # third, ensure only spark-warehouse and metastore_db are created when enableHiveSupport = T
2873  # note: as the note above, after running all tests in this file while enableHiveSupport = T, we
2874  # check the list of files again. This time we allow both whitelisted dirs to be in the diff.
2875  compare_list(sparkRFilesBefore, setdiff(filesAfter, sparkRWhitelistSQLDirs))
2876})
2877
2878unlink(parquetPath)
2879unlink(orcPath)
2880unlink(jsonPath)
2881unlink(jsonPathNa)
2882
2883sparkR.session.stop()
2884