1# 2# Licensed to the Apache Software Foundation (ASF) under one or more 3# contributor license agreements. See the NOTICE file distributed with 4# this work for additional information regarding copyright ownership. 5# The ASF licenses this file to You under the Apache License, Version 2.0 6# (the "License"); you may not use this file except in compliance with 7# the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18# Utility functions to deserialize objects from Java. 19 20# nolint start 21# Type mapping from Java to R 22# 23# void -> NULL 24# Int -> integer 25# String -> character 26# Boolean -> logical 27# Float -> double 28# Double -> double 29# Long -> double 30# Array[Byte] -> raw 31# Date -> Date 32# Time -> POSIXct 33# 34# Array[T] -> list() 35# Object -> jobj 36# 37# nolint end 38 39readObject <- function(con) { 40 # Read type first 41 type <- readType(con) 42 readTypedObject(con, type) 43} 44 45readTypedObject <- function(con, type) { 46 switch (type, 47 "i" = readInt(con), 48 "c" = readString(con), 49 "b" = readBoolean(con), 50 "d" = readDouble(con), 51 "r" = readRaw(con), 52 "D" = readDate(con), 53 "t" = readTime(con), 54 "a" = readArray(con), 55 "l" = readList(con), 56 "e" = readEnv(con), 57 "s" = readStruct(con), 58 "n" = NULL, 59 "j" = getJobj(readString(con)), 60 stop(paste("Unsupported type for deserialization", type))) 61} 62 63readString <- function(con) { 64 stringLen <- readInt(con) 65 raw <- readBin(con, raw(), stringLen, endian = "big") 66 string <- rawToChar(raw) 67 Encoding(string) <- "UTF-8" 68 string 69} 70 71readInt <- function(con) { 72 readBin(con, integer(), n = 1, endian = "big") 73} 74 75readDouble <- function(con) { 76 readBin(con, double(), n = 1, endian = "big") 77} 78 79readBoolean <- function(con) { 80 as.logical(readInt(con)) 81} 82 83readType <- function(con) { 84 rawToChar(readBin(con, "raw", n = 1L)) 85} 86 87readDate <- function(con) { 88 as.Date(readString(con)) 89} 90 91readTime <- function(con) { 92 t <- readDouble(con) 93 as.POSIXct(t, origin = "1970-01-01") 94} 95 96readArray <- function(con) { 97 type <- readType(con) 98 len <- readInt(con) 99 if (len > 0) { 100 l <- vector("list", len) 101 for (i in 1:len) { 102 l[[i]] <- readTypedObject(con, type) 103 } 104 l 105 } else { 106 list() 107 } 108} 109 110# Read a list. Types of each element may be different. 111# Null objects are read as NA. 112readList <- function(con) { 113 len <- readInt(con) 114 if (len > 0) { 115 l <- vector("list", len) 116 for (i in 1:len) { 117 elem <- readObject(con) 118 if (is.null(elem)) { 119 elem <- NA 120 } 121 l[[i]] <- elem 122 } 123 l 124 } else { 125 list() 126 } 127} 128 129readEnv <- function(con) { 130 env <- new.env() 131 len <- readInt(con) 132 if (len > 0) { 133 for (i in 1:len) { 134 key <- readString(con) 135 value <- readObject(con) 136 env[[key]] <- value 137 } 138 } 139 env 140} 141 142# Read a field of StructType from SparkDataFrame 143# into a named list in R whose class is "struct" 144readStruct <- function(con) { 145 names <- readObject(con) 146 fields <- readObject(con) 147 names(fields) <- names 148 listToStruct(fields) 149} 150 151readRaw <- function(con) { 152 dataLen <- readInt(con) 153 readBin(con, raw(), as.integer(dataLen), endian = "big") 154} 155 156readRawLen <- function(con, dataLen) { 157 readBin(con, raw(), as.integer(dataLen), endian = "big") 158} 159 160readDeserialize <- function(con) { 161 # We have two cases that are possible - In one, the entire partition is 162 # encoded as a byte array, so we have only one value to read. If so just 163 # return firstData 164 dataLen <- readInt(con) 165 firstData <- unserialize( 166 readBin(con, raw(), as.integer(dataLen), endian = "big")) 167 168 # Else, read things into a list 169 dataLen <- readInt(con) 170 if (length(dataLen) > 0 && dataLen > 0) { 171 data <- list(firstData) 172 while (length(dataLen) > 0 && dataLen > 0) { 173 data[[length(data) + 1L]] <- unserialize( 174 readBin(con, raw(), as.integer(dataLen), endian = "big")) 175 dataLen <- readInt(con) 176 } 177 unlist(data, recursive = FALSE) 178 } else { 179 firstData 180 } 181} 182 183readMultipleObjects <- function(inputCon) { 184 # readMultipleObjects will read multiple continuous objects from 185 # a DataOutputStream. There is no preceding field telling the count 186 # of the objects, so the number of objects varies, we try to read 187 # all objects in a loop until the end of the stream. 188 data <- list() 189 while (TRUE) { 190 # If reaching the end of the stream, type returned should be "". 191 type <- readType(inputCon) 192 if (type == "") { 193 break 194 } 195 data[[length(data) + 1L]] <- readTypedObject(inputCon, type) 196 } 197 data # this is a list of named lists now 198} 199 200readMultipleObjectsWithKeys <- function(inputCon) { 201 # readMultipleObjectsWithKeys will read multiple continuous objects from 202 # a DataOutputStream. There is no preceding field telling the count 203 # of the objects, so the number of objects varies, we try to read 204 # all objects in a loop until the end of the stream. This function 205 # is for use by gapply. Each group of rows is followed by the grouping 206 # key for this group which is then followed by next group. 207 keys <- list() 208 data <- list() 209 subData <- list() 210 while (TRUE) { 211 # If reaching the end of the stream, type returned should be "". 212 type <- readType(inputCon) 213 if (type == "") { 214 break 215 } else if (type == "r") { 216 type <- readType(inputCon) 217 # A grouping boundary detected 218 key <- readTypedObject(inputCon, type) 219 index <- length(data) + 1L 220 data[[index]] <- subData 221 keys[[index]] <- key 222 subData <- list() 223 } else { 224 subData[[length(subData) + 1L]] <- readTypedObject(inputCon, type) 225 } 226 } 227 list(keys = keys, data = data) # this is a list of keys and corresponding data 228} 229 230readRowList <- function(obj) { 231 # readRowList is meant for use inside an lapply. As a result, it is 232 # necessary to open a standalone connection for the row and consume 233 # the numCols bytes inside the read function in order to correctly 234 # deserialize the row. 235 rawObj <- rawConnection(obj, "r+") 236 on.exit(close(rawObj)) 237 readObject(rawObj) 238} 239