1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.catalyst.expressions
19
20import org.apache.spark.sql.catalyst.InternalRow
21import org.apache.spark.sql.types._
22
23/**
24 * A parent class for mutable container objects that are reused when the values are changed,
25 * resulting in less garbage.  These values are held by a [[SpecificInternalRow]].
26 *
27 * The following code was roughly used to generate these objects:
28 * {{{
29 * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
30 * types.map {tpe =>
31 * s"""
32 * final class Mutable$tpe extends MutableValue {
33 *   var value: $tpe = 0
34 *   def boxed = if (isNull) null else value
35 *   def update(v: Any) = value = {
36 *     isNull = false
37 *     v.asInstanceOf[$tpe]
38 *   }
39 *   def copy() = {
40 *     val newCopy = new Mutable$tpe
41 *     newCopy.isNull = isNull
42 *     newCopy.value = value
43 *     newCopy
44 *   }
45 * }"""
46 * }.foreach(println)
47 *
48 * types.map { tpe =>
49 * s"""
50 *   override def set$tpe(ordinal: Int, value: $tpe): Unit = {
51 *     val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
52 *     currentValue.isNull = false
53 *     currentValue.value = value
54 *   }
55 *
56 *   override def get$tpe(i: Int): $tpe = {
57 *     values(i).asInstanceOf[Mutable$tpe].value
58 *   }"""
59 * }.foreach(println)
60 * }}}
61 */
62abstract class MutableValue extends Serializable {
63  var isNull: Boolean = true
64  def boxed: Any
65  def update(v: Any): Unit
66  def copy(): MutableValue
67}
68
69final class MutableInt extends MutableValue {
70  var value: Int = 0
71  override def boxed: Any = if (isNull) null else value
72  override def update(v: Any): Unit = {
73    isNull = false
74    value = v.asInstanceOf[Int]
75  }
76  override def copy(): MutableInt = {
77    val newCopy = new MutableInt
78    newCopy.isNull = isNull
79    newCopy.value = value
80    newCopy
81  }
82}
83
84final class MutableFloat extends MutableValue {
85  var value: Float = 0
86  override def boxed: Any = if (isNull) null else value
87  override def update(v: Any): Unit = {
88    isNull = false
89    value = v.asInstanceOf[Float]
90  }
91  override def copy(): MutableFloat = {
92    val newCopy = new MutableFloat
93    newCopy.isNull = isNull
94    newCopy.value = value
95    newCopy
96  }
97}
98
99final class MutableBoolean extends MutableValue {
100  var value: Boolean = false
101  override def boxed: Any = if (isNull) null else value
102  override def update(v: Any): Unit = {
103    isNull = false
104    value = v.asInstanceOf[Boolean]
105  }
106  override def copy(): MutableBoolean = {
107    val newCopy = new MutableBoolean
108    newCopy.isNull = isNull
109    newCopy.value = value
110    newCopy
111  }
112}
113
114final class MutableDouble extends MutableValue {
115  var value: Double = 0
116  override def boxed: Any = if (isNull) null else value
117  override def update(v: Any): Unit = {
118    isNull = false
119    value = v.asInstanceOf[Double]
120  }
121  override def copy(): MutableDouble = {
122    val newCopy = new MutableDouble
123    newCopy.isNull = isNull
124    newCopy.value = value
125    newCopy
126  }
127}
128
129final class MutableShort extends MutableValue {
130  var value: Short = 0
131  override def boxed: Any = if (isNull) null else value
132  override def update(v: Any): Unit = value = {
133    isNull = false
134    v.asInstanceOf[Short]
135  }
136  override def copy(): MutableShort = {
137    val newCopy = new MutableShort
138    newCopy.isNull = isNull
139    newCopy.value = value
140    newCopy
141  }
142}
143
144final class MutableLong extends MutableValue {
145  var value: Long = 0
146  override def boxed: Any = if (isNull) null else value
147  override def update(v: Any): Unit = value = {
148    isNull = false
149    v.asInstanceOf[Long]
150  }
151  override def copy(): MutableLong = {
152    val newCopy = new MutableLong
153    newCopy.isNull = isNull
154    newCopy.value = value
155    newCopy
156  }
157}
158
159final class MutableByte extends MutableValue {
160  var value: Byte = 0
161  override def boxed: Any = if (isNull) null else value
162  override def update(v: Any): Unit = value = {
163    isNull = false
164    v.asInstanceOf[Byte]
165  }
166  override def copy(): MutableByte = {
167    val newCopy = new MutableByte
168    newCopy.isNull = isNull
169    newCopy.value = value
170    newCopy
171  }
172}
173
174final class MutableAny extends MutableValue {
175  var value: Any = _
176  override def boxed: Any = if (isNull) null else value
177  override def update(v: Any): Unit = {
178    isNull = false
179    value = v.asInstanceOf[Any]
180  }
181  override def copy(): MutableAny = {
182    val newCopy = new MutableAny
183    newCopy.isNull = isNull
184    newCopy.value = value
185    newCopy
186  }
187}
188
189/**
190 * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
191 * based on the dataTypes of each column.  The intent is to decrease garbage when modifying the
192 * values of primitive columns.
193 */
194final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow {
195
196  def this(dataTypes: Seq[DataType]) =
197    this(
198      dataTypes.map {
199        case BooleanType => new MutableBoolean
200        case ByteType => new MutableByte
201        case ShortType => new MutableShort
202        // We use INT for DATE internally
203        case IntegerType | DateType => new MutableInt
204        // We use Long for Timestamp internally
205        case LongType | TimestampType => new MutableLong
206        case FloatType => new MutableFloat
207        case DoubleType => new MutableDouble
208        case _ => new MutableAny
209      }.toArray)
210
211  def this() = this(Seq.empty)
212
213  def this(schema: StructType) = this(schema.fields.map(_.dataType))
214
215  override def numFields: Int = values.length
216
217  override def setNullAt(i: Int): Unit = {
218    values(i).isNull = true
219  }
220
221  override def isNullAt(i: Int): Boolean = values(i).isNull
222
223  override def copy(): InternalRow = {
224    val newValues = new Array[Any](values.length)
225    var i = 0
226    while (i < values.length) {
227      newValues(i) = values(i).boxed
228      i += 1
229    }
230
231    new GenericInternalRow(newValues)
232  }
233
234  override protected def genericGet(i: Int): Any = values(i).boxed
235
236  override def update(ordinal: Int, value: Any) {
237    if (value == null) {
238      setNullAt(ordinal)
239    } else {
240      values(ordinal).update(value)
241    }
242  }
243
244  override def setInt(ordinal: Int, value: Int): Unit = {
245    val currentValue = values(ordinal).asInstanceOf[MutableInt]
246    currentValue.isNull = false
247    currentValue.value = value
248  }
249
250  override def getInt(i: Int): Int = {
251    values(i).asInstanceOf[MutableInt].value
252  }
253
254  override def setFloat(ordinal: Int, value: Float): Unit = {
255    val currentValue = values(ordinal).asInstanceOf[MutableFloat]
256    currentValue.isNull = false
257    currentValue.value = value
258  }
259
260  override def getFloat(i: Int): Float = {
261    values(i).asInstanceOf[MutableFloat].value
262  }
263
264  override def setBoolean(ordinal: Int, value: Boolean): Unit = {
265    val currentValue = values(ordinal).asInstanceOf[MutableBoolean]
266    currentValue.isNull = false
267    currentValue.value = value
268  }
269
270  override def getBoolean(i: Int): Boolean = {
271    values(i).asInstanceOf[MutableBoolean].value
272  }
273
274  override def setDouble(ordinal: Int, value: Double): Unit = {
275    val currentValue = values(ordinal).asInstanceOf[MutableDouble]
276    currentValue.isNull = false
277    currentValue.value = value
278  }
279
280  override def getDouble(i: Int): Double = {
281    values(i).asInstanceOf[MutableDouble].value
282  }
283
284  override def setShort(ordinal: Int, value: Short): Unit = {
285    val currentValue = values(ordinal).asInstanceOf[MutableShort]
286    currentValue.isNull = false
287    currentValue.value = value
288  }
289
290  override def getShort(i: Int): Short = {
291    values(i).asInstanceOf[MutableShort].value
292  }
293
294  override def setLong(ordinal: Int, value: Long): Unit = {
295    val currentValue = values(ordinal).asInstanceOf[MutableLong]
296    currentValue.isNull = false
297    currentValue.value = value
298  }
299
300  override def getLong(i: Int): Long = {
301    values(i).asInstanceOf[MutableLong].value
302  }
303
304  override def setByte(ordinal: Int, value: Byte): Unit = {
305    val currentValue = values(ordinal).asInstanceOf[MutableByte]
306    currentValue.isNull = false
307    currentValue.value = value
308  }
309
310  override def getByte(i: Int): Byte = {
311    values(i).asInstanceOf[MutableByte].value
312  }
313}
314