1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.catalyst.expressions 19 20import org.apache.spark.sql.catalyst.InternalRow 21import org.apache.spark.sql.types._ 22 23/** 24 * A parent class for mutable container objects that are reused when the values are changed, 25 * resulting in less garbage. These values are held by a [[SpecificInternalRow]]. 26 * 27 * The following code was roughly used to generate these objects: 28 * {{{ 29 * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",") 30 * types.map {tpe => 31 * s""" 32 * final class Mutable$tpe extends MutableValue { 33 * var value: $tpe = 0 34 * def boxed = if (isNull) null else value 35 * def update(v: Any) = value = { 36 * isNull = false 37 * v.asInstanceOf[$tpe] 38 * } 39 * def copy() = { 40 * val newCopy = new Mutable$tpe 41 * newCopy.isNull = isNull 42 * newCopy.value = value 43 * newCopy 44 * } 45 * }""" 46 * }.foreach(println) 47 * 48 * types.map { tpe => 49 * s""" 50 * override def set$tpe(ordinal: Int, value: $tpe): Unit = { 51 * val currentValue = values(ordinal).asInstanceOf[Mutable$tpe] 52 * currentValue.isNull = false 53 * currentValue.value = value 54 * } 55 * 56 * override def get$tpe(i: Int): $tpe = { 57 * values(i).asInstanceOf[Mutable$tpe].value 58 * }""" 59 * }.foreach(println) 60 * }}} 61 */ 62abstract class MutableValue extends Serializable { 63 var isNull: Boolean = true 64 def boxed: Any 65 def update(v: Any): Unit 66 def copy(): MutableValue 67} 68 69final class MutableInt extends MutableValue { 70 var value: Int = 0 71 override def boxed: Any = if (isNull) null else value 72 override def update(v: Any): Unit = { 73 isNull = false 74 value = v.asInstanceOf[Int] 75 } 76 override def copy(): MutableInt = { 77 val newCopy = new MutableInt 78 newCopy.isNull = isNull 79 newCopy.value = value 80 newCopy 81 } 82} 83 84final class MutableFloat extends MutableValue { 85 var value: Float = 0 86 override def boxed: Any = if (isNull) null else value 87 override def update(v: Any): Unit = { 88 isNull = false 89 value = v.asInstanceOf[Float] 90 } 91 override def copy(): MutableFloat = { 92 val newCopy = new MutableFloat 93 newCopy.isNull = isNull 94 newCopy.value = value 95 newCopy 96 } 97} 98 99final class MutableBoolean extends MutableValue { 100 var value: Boolean = false 101 override def boxed: Any = if (isNull) null else value 102 override def update(v: Any): Unit = { 103 isNull = false 104 value = v.asInstanceOf[Boolean] 105 } 106 override def copy(): MutableBoolean = { 107 val newCopy = new MutableBoolean 108 newCopy.isNull = isNull 109 newCopy.value = value 110 newCopy 111 } 112} 113 114final class MutableDouble extends MutableValue { 115 var value: Double = 0 116 override def boxed: Any = if (isNull) null else value 117 override def update(v: Any): Unit = { 118 isNull = false 119 value = v.asInstanceOf[Double] 120 } 121 override def copy(): MutableDouble = { 122 val newCopy = new MutableDouble 123 newCopy.isNull = isNull 124 newCopy.value = value 125 newCopy 126 } 127} 128 129final class MutableShort extends MutableValue { 130 var value: Short = 0 131 override def boxed: Any = if (isNull) null else value 132 override def update(v: Any): Unit = value = { 133 isNull = false 134 v.asInstanceOf[Short] 135 } 136 override def copy(): MutableShort = { 137 val newCopy = new MutableShort 138 newCopy.isNull = isNull 139 newCopy.value = value 140 newCopy 141 } 142} 143 144final class MutableLong extends MutableValue { 145 var value: Long = 0 146 override def boxed: Any = if (isNull) null else value 147 override def update(v: Any): Unit = value = { 148 isNull = false 149 v.asInstanceOf[Long] 150 } 151 override def copy(): MutableLong = { 152 val newCopy = new MutableLong 153 newCopy.isNull = isNull 154 newCopy.value = value 155 newCopy 156 } 157} 158 159final class MutableByte extends MutableValue { 160 var value: Byte = 0 161 override def boxed: Any = if (isNull) null else value 162 override def update(v: Any): Unit = value = { 163 isNull = false 164 v.asInstanceOf[Byte] 165 } 166 override def copy(): MutableByte = { 167 val newCopy = new MutableByte 168 newCopy.isNull = isNull 169 newCopy.value = value 170 newCopy 171 } 172} 173 174final class MutableAny extends MutableValue { 175 var value: Any = _ 176 override def boxed: Any = if (isNull) null else value 177 override def update(v: Any): Unit = { 178 isNull = false 179 value = v.asInstanceOf[Any] 180 } 181 override def copy(): MutableAny = { 182 val newCopy = new MutableAny 183 newCopy.isNull = isNull 184 newCopy.value = value 185 newCopy 186 } 187} 188 189/** 190 * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen 191 * based on the dataTypes of each column. The intent is to decrease garbage when modifying the 192 * values of primitive columns. 193 */ 194final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow { 195 196 def this(dataTypes: Seq[DataType]) = 197 this( 198 dataTypes.map { 199 case BooleanType => new MutableBoolean 200 case ByteType => new MutableByte 201 case ShortType => new MutableShort 202 // We use INT for DATE internally 203 case IntegerType | DateType => new MutableInt 204 // We use Long for Timestamp internally 205 case LongType | TimestampType => new MutableLong 206 case FloatType => new MutableFloat 207 case DoubleType => new MutableDouble 208 case _ => new MutableAny 209 }.toArray) 210 211 def this() = this(Seq.empty) 212 213 def this(schema: StructType) = this(schema.fields.map(_.dataType)) 214 215 override def numFields: Int = values.length 216 217 override def setNullAt(i: Int): Unit = { 218 values(i).isNull = true 219 } 220 221 override def isNullAt(i: Int): Boolean = values(i).isNull 222 223 override def copy(): InternalRow = { 224 val newValues = new Array[Any](values.length) 225 var i = 0 226 while (i < values.length) { 227 newValues(i) = values(i).boxed 228 i += 1 229 } 230 231 new GenericInternalRow(newValues) 232 } 233 234 override protected def genericGet(i: Int): Any = values(i).boxed 235 236 override def update(ordinal: Int, value: Any) { 237 if (value == null) { 238 setNullAt(ordinal) 239 } else { 240 values(ordinal).update(value) 241 } 242 } 243 244 override def setInt(ordinal: Int, value: Int): Unit = { 245 val currentValue = values(ordinal).asInstanceOf[MutableInt] 246 currentValue.isNull = false 247 currentValue.value = value 248 } 249 250 override def getInt(i: Int): Int = { 251 values(i).asInstanceOf[MutableInt].value 252 } 253 254 override def setFloat(ordinal: Int, value: Float): Unit = { 255 val currentValue = values(ordinal).asInstanceOf[MutableFloat] 256 currentValue.isNull = false 257 currentValue.value = value 258 } 259 260 override def getFloat(i: Int): Float = { 261 values(i).asInstanceOf[MutableFloat].value 262 } 263 264 override def setBoolean(ordinal: Int, value: Boolean): Unit = { 265 val currentValue = values(ordinal).asInstanceOf[MutableBoolean] 266 currentValue.isNull = false 267 currentValue.value = value 268 } 269 270 override def getBoolean(i: Int): Boolean = { 271 values(i).asInstanceOf[MutableBoolean].value 272 } 273 274 override def setDouble(ordinal: Int, value: Double): Unit = { 275 val currentValue = values(ordinal).asInstanceOf[MutableDouble] 276 currentValue.isNull = false 277 currentValue.value = value 278 } 279 280 override def getDouble(i: Int): Double = { 281 values(i).asInstanceOf[MutableDouble].value 282 } 283 284 override def setShort(ordinal: Int, value: Short): Unit = { 285 val currentValue = values(ordinal).asInstanceOf[MutableShort] 286 currentValue.isNull = false 287 currentValue.value = value 288 } 289 290 override def getShort(i: Int): Short = { 291 values(i).asInstanceOf[MutableShort].value 292 } 293 294 override def setLong(ordinal: Int, value: Long): Unit = { 295 val currentValue = values(ordinal).asInstanceOf[MutableLong] 296 currentValue.isNull = false 297 currentValue.value = value 298 } 299 300 override def getLong(i: Int): Long = { 301 values(i).asInstanceOf[MutableLong].value 302 } 303 304 override def setByte(ordinal: Int, value: Byte): Unit = { 305 val currentValue = values(ordinal).asInstanceOf[MutableByte] 306 currentValue.isNull = false 307 currentValue.value = value 308 } 309 310 override def getByte(i: Int): Byte = { 311 values(i).asInstanceOf[MutableByte].value 312 } 313} 314