1#
2# Licensed to the Apache Software Foundation (ASF) under one or more
3# contributor license agreements.  See the NOTICE file distributed with
4# this work for additional information regarding copyright ownership.
5# The ASF licenses this file to You under the Apache License, Version 2.0
6# (the "License"); you may not use this file except in compliance with
7# the License.  You may obtain a copy of the License at
8#
9#    http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18# Utility functions to deserialize objects from Java.
19
20# nolint start
21# Type mapping from Java to R
22#
23# void -> NULL
24# Int -> integer
25# String -> character
26# Boolean -> logical
27# Float -> double
28# Double -> double
29# Long -> double
30# Array[Byte] -> raw
31# Date -> Date
32# Time -> POSIXct
33#
34# Array[T] -> list()
35# Object -> jobj
36#
37# nolint end
38
39readObject <- function(con) {
40  # Read type first
41  type <- readType(con)
42  readTypedObject(con, type)
43}
44
45readTypedObject <- function(con, type) {
46  switch (type,
47    "i" = readInt(con),
48    "c" = readString(con),
49    "b" = readBoolean(con),
50    "d" = readDouble(con),
51    "r" = readRaw(con),
52    "D" = readDate(con),
53    "t" = readTime(con),
54    "a" = readArray(con),
55    "l" = readList(con),
56    "e" = readEnv(con),
57    "s" = readStruct(con),
58    "n" = NULL,
59    "j" = getJobj(readString(con)),
60    stop(paste("Unsupported type for deserialization", type)))
61}
62
63readString <- function(con) {
64  stringLen <- readInt(con)
65  raw <- readBin(con, raw(), stringLen, endian = "big")
66  string <- rawToChar(raw)
67  Encoding(string) <- "UTF-8"
68  string
69}
70
71readInt <- function(con) {
72  readBin(con, integer(), n = 1, endian = "big")
73}
74
75readDouble <- function(con) {
76  readBin(con, double(), n = 1, endian = "big")
77}
78
79readBoolean <- function(con) {
80  as.logical(readInt(con))
81}
82
83readType <- function(con) {
84  rawToChar(readBin(con, "raw", n = 1L))
85}
86
87readDate <- function(con) {
88  as.Date(readString(con))
89}
90
91readTime <- function(con) {
92  t <- readDouble(con)
93  as.POSIXct(t, origin = "1970-01-01")
94}
95
96readArray <- function(con) {
97  type <- readType(con)
98  len <- readInt(con)
99  if (len > 0) {
100    l <- vector("list", len)
101    for (i in 1:len) {
102      l[[i]] <- readTypedObject(con, type)
103    }
104    l
105  } else {
106    list()
107  }
108}
109
110# Read a list. Types of each element may be different.
111# Null objects are read as NA.
112readList <- function(con) {
113  len <- readInt(con)
114  if (len > 0) {
115    l <- vector("list", len)
116    for (i in 1:len) {
117      elem <- readObject(con)
118      if (is.null(elem)) {
119        elem <- NA
120      }
121      l[[i]] <- elem
122    }
123    l
124  } else {
125    list()
126  }
127}
128
129readEnv <- function(con) {
130  env <- new.env()
131  len <- readInt(con)
132  if (len > 0) {
133    for (i in 1:len) {
134      key <- readString(con)
135      value <- readObject(con)
136      env[[key]] <- value
137    }
138  }
139  env
140}
141
142# Read a field of StructType from SparkDataFrame
143# into a named list in R whose class is "struct"
144readStruct <- function(con) {
145  names <- readObject(con)
146  fields <- readObject(con)
147  names(fields) <- names
148  listToStruct(fields)
149}
150
151readRaw <- function(con) {
152  dataLen <- readInt(con)
153  readBin(con, raw(), as.integer(dataLen), endian = "big")
154}
155
156readRawLen <- function(con, dataLen) {
157  readBin(con, raw(), as.integer(dataLen), endian = "big")
158}
159
160readDeserialize <- function(con) {
161  # We have two cases that are possible - In one, the entire partition is
162  # encoded as a byte array, so we have only one value to read. If so just
163  # return firstData
164  dataLen <- readInt(con)
165  firstData <- unserialize(
166      readBin(con, raw(), as.integer(dataLen), endian = "big"))
167
168  # Else, read things into a list
169  dataLen <- readInt(con)
170  if (length(dataLen) > 0 && dataLen > 0) {
171    data <- list(firstData)
172    while (length(dataLen) > 0 && dataLen > 0) {
173      data[[length(data) + 1L]] <- unserialize(
174          readBin(con, raw(), as.integer(dataLen), endian = "big"))
175      dataLen <- readInt(con)
176    }
177    unlist(data, recursive = FALSE)
178  } else {
179    firstData
180  }
181}
182
183readMultipleObjects <- function(inputCon) {
184  # readMultipleObjects will read multiple continuous objects from
185  # a DataOutputStream. There is no preceding field telling the count
186  # of the objects, so the number of objects varies, we try to read
187  # all objects in a loop until the end of the stream.
188  data <- list()
189  while (TRUE) {
190    # If reaching the end of the stream, type returned should be "".
191    type <- readType(inputCon)
192    if (type == "") {
193      break
194    }
195    data[[length(data) + 1L]] <- readTypedObject(inputCon, type)
196  }
197  data # this is a list of named lists now
198}
199
200readMultipleObjectsWithKeys <- function(inputCon) {
201  # readMultipleObjectsWithKeys will read multiple continuous objects from
202  # a DataOutputStream. There is no preceding field telling the count
203  # of the objects, so the number of objects varies, we try to read
204  # all objects in a loop until the end of the stream. This function
205  # is for use by gapply. Each group of rows is followed by the grouping
206  # key for this group which is then followed by next group.
207  keys <- list()
208  data <- list()
209  subData <- list()
210  while (TRUE) {
211    # If reaching the end of the stream, type returned should be "".
212    type <- readType(inputCon)
213    if (type == "") {
214      break
215    } else if (type == "r") {
216      type <- readType(inputCon)
217      # A grouping boundary detected
218      key <- readTypedObject(inputCon, type)
219      index <- length(data) + 1L
220      data[[index]] <- subData
221      keys[[index]] <- key
222      subData <- list()
223    } else {
224      subData[[length(subData) + 1L]] <- readTypedObject(inputCon, type)
225    }
226  }
227  list(keys = keys, data = data) # this is a list of keys and corresponding data
228}
229
230readRowList <- function(obj) {
231  # readRowList is meant for use inside an lapply. As a result, it is
232  # necessary to open a standalone connection for the row and consume
233  # the numCols bytes inside the read function in order to correctly
234  # deserialize the row.
235  rawObj <- rawConnection(obj, "r+")
236  on.exit(close(rawObj))
237  readObject(rawObj)
238}
239