1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.spark.sql.catalyst.expressions;
19 
20 import java.io.*;
21 import java.math.BigDecimal;
22 import java.math.BigInteger;
23 import java.nio.ByteBuffer;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.HashSet;
27 import java.util.Set;
28 
29 import com.esotericsoftware.kryo.Kryo;
30 import com.esotericsoftware.kryo.KryoSerializable;
31 import com.esotericsoftware.kryo.io.Input;
32 import com.esotericsoftware.kryo.io.Output;
33 
34 import org.apache.spark.sql.catalyst.InternalRow;
35 import org.apache.spark.sql.types.*;
36 import org.apache.spark.unsafe.Platform;
37 import org.apache.spark.unsafe.array.ByteArrayMethods;
38 import org.apache.spark.unsafe.bitset.BitSetMethods;
39 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
40 import org.apache.spark.unsafe.types.CalendarInterval;
41 import org.apache.spark.unsafe.types.UTF8String;
42 
43 import static org.apache.spark.sql.types.DataTypes.*;
44 import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
45 
46 /**
47  * An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
48  *
49  * Each tuple has three parts: [null bit set] [values] [variable length portion]
50  *
51  * The bit set is used for null tracking and is aligned to 8-byte word boundaries.  It stores
52  * one bit per field.
53  *
54  * In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length
55  * primitive types, such as long, double, or int, we store the value directly in the word. For
56  * fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the
57  * base address of the row) that points to the beginning of the variable-length field, and length
58  * (they are combined into a long).
59  *
60  * Instances of `UnsafeRow` act as pointers to row data stored in this format.
61  */
62 public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable {
63 
64   //////////////////////////////////////////////////////////////////////////////
65   // Static methods
66   //////////////////////////////////////////////////////////////////////////////
67 
calculateBitSetWidthInBytes(int numFields)68   public static int calculateBitSetWidthInBytes(int numFields) {
69     return ((numFields + 63)/ 64) * 8;
70   }
71 
calculateFixedPortionByteSize(int numFields)72   public static int calculateFixedPortionByteSize(int numFields) {
73     return 8 * numFields + calculateBitSetWidthInBytes(numFields);
74   }
75 
76   /**
77    * Field types that can be updated in place in UnsafeRows (e.g. we support set() for these types)
78    */
79   public static final Set<DataType> mutableFieldTypes;
80 
81   // DecimalType is also mutable
82   static {
83     mutableFieldTypes = Collections.unmodifiableSet(
84       new HashSet<>(
85         Arrays.asList(new DataType[] {
86           NullType,
87           BooleanType,
88           ByteType,
89           ShortType,
90           IntegerType,
91           LongType,
92           FloatType,
93           DoubleType,
94           DateType,
95           TimestampType
96         })));
97   }
98 
isFixedLength(DataType dt)99   public static boolean isFixedLength(DataType dt) {
100     if (dt instanceof DecimalType) {
101       return ((DecimalType) dt).precision() <= Decimal.MAX_LONG_DIGITS();
102     } else {
103       return mutableFieldTypes.contains(dt);
104     }
105   }
106 
isMutable(DataType dt)107   public static boolean isMutable(DataType dt) {
108     return mutableFieldTypes.contains(dt) || dt instanceof DecimalType;
109   }
110 
111   //////////////////////////////////////////////////////////////////////////////
112   // Private fields and methods
113   //////////////////////////////////////////////////////////////////////////////
114 
115   private Object baseObject;
116   private long baseOffset;
117 
118   /** The number of fields in this row, used for calculating the bitset width (and in assertions) */
119   private int numFields;
120 
121   /** The size of this row's backing data, in bytes) */
122   private int sizeInBytes;
123 
124   /** The width of the null tracking bit set, in bytes */
125   private int bitSetWidthInBytes;
126 
getFieldOffset(int ordinal)127   private long getFieldOffset(int ordinal) {
128     return baseOffset + bitSetWidthInBytes + ordinal * 8L;
129   }
130 
assertIndexIsValid(int index)131   private void assertIndexIsValid(int index) {
132     assert index >= 0 : "index (" + index + ") should >= 0";
133     assert index < numFields : "index (" + index + ") should < " + numFields;
134   }
135 
136   //////////////////////////////////////////////////////////////////////////////
137   // Public methods
138   //////////////////////////////////////////////////////////////////////////////
139 
140   /**
141    * Construct a new UnsafeRow. The resulting row won't be usable until `pointTo()` has been called,
142    * since the value returned by this constructor is equivalent to a null pointer.
143    *
144    * @param numFields the number of fields in this row
145    */
146   public UnsafeRow(int numFields) {
147     this.numFields = numFields;
148     this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
149   }
150 
151   // for serializer
152   public UnsafeRow() {}
153 
154   public Object getBaseObject() { return baseObject; }
155   public long getBaseOffset() { return baseOffset; }
156   public int getSizeInBytes() { return sizeInBytes; }
157 
158   @Override
159   public int numFields() { return numFields; }
160 
161   /**
162    * Update this UnsafeRow to point to different backing data.
163    *
164    * @param baseObject the base object
165    * @param baseOffset the offset within the base object
166    * @param sizeInBytes the size of this row's backing data, in bytes
167    */
168   public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
169     assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
170     this.baseObject = baseObject;
171     this.baseOffset = baseOffset;
172     this.sizeInBytes = sizeInBytes;
173   }
174 
175   /**
176    * Update this UnsafeRow to point to the underlying byte array.
177    *
178    * @param buf byte array to point to
179    * @param sizeInBytes the number of bytes valid in the byte array
180    */
pointTo(byte[] buf, int sizeInBytes)181   public void pointTo(byte[] buf, int sizeInBytes) {
182     pointTo(buf, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
183   }
184 
setTotalSize(int sizeInBytes)185   public void setTotalSize(int sizeInBytes) {
186     this.sizeInBytes = sizeInBytes;
187   }
188 
setNotNullAt(int i)189   public void setNotNullAt(int i) {
190     assertIndexIsValid(i);
191     BitSetMethods.unset(baseObject, baseOffset, i);
192   }
193 
194   @Override
setNullAt(int i)195   public void setNullAt(int i) {
196     assertIndexIsValid(i);
197     BitSetMethods.set(baseObject, baseOffset, i);
198     // To preserve row equality, zero out the value when setting the column to null.
199     // Since this row does does not currently support updates to variable-length values, we don't
200     // have to worry about zeroing out that data.
201     Platform.putLong(baseObject, getFieldOffset(i), 0);
202   }
203 
204   @Override
update(int ordinal, Object value)205   public void update(int ordinal, Object value) {
206     throw new UnsupportedOperationException();
207   }
208 
209   @Override
setInt(int ordinal, int value)210   public void setInt(int ordinal, int value) {
211     assertIndexIsValid(ordinal);
212     setNotNullAt(ordinal);
213     Platform.putInt(baseObject, getFieldOffset(ordinal), value);
214   }
215 
216   @Override
setLong(int ordinal, long value)217   public void setLong(int ordinal, long value) {
218     assertIndexIsValid(ordinal);
219     setNotNullAt(ordinal);
220     Platform.putLong(baseObject, getFieldOffset(ordinal), value);
221   }
222 
223   @Override
setDouble(int ordinal, double value)224   public void setDouble(int ordinal, double value) {
225     assertIndexIsValid(ordinal);
226     setNotNullAt(ordinal);
227     if (Double.isNaN(value)) {
228       value = Double.NaN;
229     }
230     Platform.putDouble(baseObject, getFieldOffset(ordinal), value);
231   }
232 
233   @Override
setBoolean(int ordinal, boolean value)234   public void setBoolean(int ordinal, boolean value) {
235     assertIndexIsValid(ordinal);
236     setNotNullAt(ordinal);
237     Platform.putBoolean(baseObject, getFieldOffset(ordinal), value);
238   }
239 
240   @Override
setShort(int ordinal, short value)241   public void setShort(int ordinal, short value) {
242     assertIndexIsValid(ordinal);
243     setNotNullAt(ordinal);
244     Platform.putShort(baseObject, getFieldOffset(ordinal), value);
245   }
246 
247   @Override
setByte(int ordinal, byte value)248   public void setByte(int ordinal, byte value) {
249     assertIndexIsValid(ordinal);
250     setNotNullAt(ordinal);
251     Platform.putByte(baseObject, getFieldOffset(ordinal), value);
252   }
253 
254   @Override
setFloat(int ordinal, float value)255   public void setFloat(int ordinal, float value) {
256     assertIndexIsValid(ordinal);
257     setNotNullAt(ordinal);
258     if (Float.isNaN(value)) {
259       value = Float.NaN;
260     }
261     Platform.putFloat(baseObject, getFieldOffset(ordinal), value);
262   }
263 
264   /**
265    * Updates the decimal column.
266    *
267    * Note: In order to support update a decimal with precision > 18, CAN NOT call
268    * setNullAt() for this column.
269    */
270   @Override
setDecimal(int ordinal, Decimal value, int precision)271   public void setDecimal(int ordinal, Decimal value, int precision) {
272     assertIndexIsValid(ordinal);
273     if (precision <= Decimal.MAX_LONG_DIGITS()) {
274       // compact format
275       if (value == null) {
276         setNullAt(ordinal);
277       } else {
278         setLong(ordinal, value.toUnscaledLong());
279       }
280     } else {
281       // fixed length
282       long cursor = getLong(ordinal) >>> 32;
283       assert cursor > 0 : "invalid cursor " + cursor;
284       // zero-out the bytes
285       Platform.putLong(baseObject, baseOffset + cursor, 0L);
286       Platform.putLong(baseObject, baseOffset + cursor + 8, 0L);
287 
288       if (value == null) {
289         setNullAt(ordinal);
290         // keep the offset for future update
291         Platform.putLong(baseObject, getFieldOffset(ordinal), cursor << 32);
292       } else {
293 
294         final BigInteger integer = value.toJavaBigDecimal().unscaledValue();
295         byte[] bytes = integer.toByteArray();
296         assert(bytes.length <= 16);
297 
298         // Write the bytes to the variable length portion.
299         Platform.copyMemory(
300           bytes, Platform.BYTE_ARRAY_OFFSET, baseObject, baseOffset + cursor, bytes.length);
301         setLong(ordinal, (cursor << 32) | ((long) bytes.length));
302       }
303     }
304   }
305 
306   @Override
get(int ordinal, DataType dataType)307   public Object get(int ordinal, DataType dataType) {
308     if (isNullAt(ordinal) || dataType instanceof NullType) {
309       return null;
310     } else if (dataType instanceof BooleanType) {
311       return getBoolean(ordinal);
312     } else if (dataType instanceof ByteType) {
313       return getByte(ordinal);
314     } else if (dataType instanceof ShortType) {
315       return getShort(ordinal);
316     } else if (dataType instanceof IntegerType) {
317       return getInt(ordinal);
318     } else if (dataType instanceof LongType) {
319       return getLong(ordinal);
320     } else if (dataType instanceof FloatType) {
321       return getFloat(ordinal);
322     } else if (dataType instanceof DoubleType) {
323       return getDouble(ordinal);
324     } else if (dataType instanceof DecimalType) {
325       DecimalType dt = (DecimalType) dataType;
326       return getDecimal(ordinal, dt.precision(), dt.scale());
327     } else if (dataType instanceof DateType) {
328       return getInt(ordinal);
329     } else if (dataType instanceof TimestampType) {
330       return getLong(ordinal);
331     } else if (dataType instanceof BinaryType) {
332       return getBinary(ordinal);
333     } else if (dataType instanceof StringType) {
334       return getUTF8String(ordinal);
335     } else if (dataType instanceof CalendarIntervalType) {
336       return getInterval(ordinal);
337     } else if (dataType instanceof StructType) {
338       return getStruct(ordinal, ((StructType) dataType).size());
339     } else if (dataType instanceof ArrayType) {
340       return getArray(ordinal);
341     } else if (dataType instanceof MapType) {
342       return getMap(ordinal);
343     } else if (dataType instanceof UserDefinedType) {
344       return get(ordinal, ((UserDefinedType)dataType).sqlType());
345     } else {
346       throw new UnsupportedOperationException("Unsupported data type " + dataType.simpleString());
347     }
348   }
349 
350   @Override
isNullAt(int ordinal)351   public boolean isNullAt(int ordinal) {
352     assertIndexIsValid(ordinal);
353     return BitSetMethods.isSet(baseObject, baseOffset, ordinal);
354   }
355 
356   @Override
getBoolean(int ordinal)357   public boolean getBoolean(int ordinal) {
358     assertIndexIsValid(ordinal);
359     return Platform.getBoolean(baseObject, getFieldOffset(ordinal));
360   }
361 
362   @Override
getByte(int ordinal)363   public byte getByte(int ordinal) {
364     assertIndexIsValid(ordinal);
365     return Platform.getByte(baseObject, getFieldOffset(ordinal));
366   }
367 
368   @Override
getShort(int ordinal)369   public short getShort(int ordinal) {
370     assertIndexIsValid(ordinal);
371     return Platform.getShort(baseObject, getFieldOffset(ordinal));
372   }
373 
374   @Override
getInt(int ordinal)375   public int getInt(int ordinal) {
376     assertIndexIsValid(ordinal);
377     return Platform.getInt(baseObject, getFieldOffset(ordinal));
378   }
379 
380   @Override
getLong(int ordinal)381   public long getLong(int ordinal) {
382     assertIndexIsValid(ordinal);
383     return Platform.getLong(baseObject, getFieldOffset(ordinal));
384   }
385 
386   @Override
getFloat(int ordinal)387   public float getFloat(int ordinal) {
388     assertIndexIsValid(ordinal);
389     return Platform.getFloat(baseObject, getFieldOffset(ordinal));
390   }
391 
392   @Override
getDouble(int ordinal)393   public double getDouble(int ordinal) {
394     assertIndexIsValid(ordinal);
395     return Platform.getDouble(baseObject, getFieldOffset(ordinal));
396   }
397 
398   @Override
getDecimal(int ordinal, int precision, int scale)399   public Decimal getDecimal(int ordinal, int precision, int scale) {
400     if (isNullAt(ordinal)) {
401       return null;
402     }
403     if (precision <= Decimal.MAX_LONG_DIGITS()) {
404       return Decimal.createUnsafe(getLong(ordinal), precision, scale);
405     } else {
406       byte[] bytes = getBinary(ordinal);
407       BigInteger bigInteger = new BigInteger(bytes);
408       BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
409       return Decimal.apply(javaDecimal, precision, scale);
410     }
411   }
412 
413   @Override
getUTF8String(int ordinal)414   public UTF8String getUTF8String(int ordinal) {
415     if (isNullAt(ordinal)) return null;
416     final long offsetAndSize = getLong(ordinal);
417     final int offset = (int) (offsetAndSize >> 32);
418     final int size = (int) offsetAndSize;
419     return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
420   }
421 
422   @Override
getBinary(int ordinal)423   public byte[] getBinary(int ordinal) {
424     if (isNullAt(ordinal)) {
425       return null;
426     } else {
427       final long offsetAndSize = getLong(ordinal);
428       final int offset = (int) (offsetAndSize >> 32);
429       final int size = (int) offsetAndSize;
430       final byte[] bytes = new byte[size];
431       Platform.copyMemory(
432         baseObject,
433         baseOffset + offset,
434         bytes,
435         Platform.BYTE_ARRAY_OFFSET,
436         size
437       );
438       return bytes;
439     }
440   }
441 
442   @Override
getInterval(int ordinal)443   public CalendarInterval getInterval(int ordinal) {
444     if (isNullAt(ordinal)) {
445       return null;
446     } else {
447       final long offsetAndSize = getLong(ordinal);
448       final int offset = (int) (offsetAndSize >> 32);
449       final int months = (int) Platform.getLong(baseObject, baseOffset + offset);
450       final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8);
451       return new CalendarInterval(months, microseconds);
452     }
453   }
454 
455   @Override
getStruct(int ordinal, int numFields)456   public UnsafeRow getStruct(int ordinal, int numFields) {
457     if (isNullAt(ordinal)) {
458       return null;
459     } else {
460       final long offsetAndSize = getLong(ordinal);
461       final int offset = (int) (offsetAndSize >> 32);
462       final int size = (int) offsetAndSize;
463       final UnsafeRow row = new UnsafeRow(numFields);
464       row.pointTo(baseObject, baseOffset + offset, size);
465       return row;
466     }
467   }
468 
469   @Override
getArray(int ordinal)470   public UnsafeArrayData getArray(int ordinal) {
471     if (isNullAt(ordinal)) {
472       return null;
473     } else {
474       final long offsetAndSize = getLong(ordinal);
475       final int offset = (int) (offsetAndSize >> 32);
476       final int size = (int) offsetAndSize;
477       final UnsafeArrayData array = new UnsafeArrayData();
478       array.pointTo(baseObject, baseOffset + offset, size);
479       return array;
480     }
481   }
482 
483   @Override
getMap(int ordinal)484   public UnsafeMapData getMap(int ordinal) {
485     if (isNullAt(ordinal)) {
486       return null;
487     } else {
488       final long offsetAndSize = getLong(ordinal);
489       final int offset = (int) (offsetAndSize >> 32);
490       final int size = (int) offsetAndSize;
491       final UnsafeMapData map = new UnsafeMapData();
492       map.pointTo(baseObject, baseOffset + offset, size);
493       return map;
494     }
495   }
496 
497   /**
498    * Copies this row, returning a self-contained UnsafeRow that stores its data in an internal
499    * byte array rather than referencing data stored in a data page.
500    */
501   @Override
copy()502   public UnsafeRow copy() {
503     UnsafeRow rowCopy = new UnsafeRow(numFields);
504     final byte[] rowDataCopy = new byte[sizeInBytes];
505     Platform.copyMemory(
506       baseObject,
507       baseOffset,
508       rowDataCopy,
509       Platform.BYTE_ARRAY_OFFSET,
510       sizeInBytes
511     );
512     rowCopy.pointTo(rowDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
513     return rowCopy;
514   }
515 
516   /**
517    * Creates an empty UnsafeRow from a byte array with specified numBytes and numFields.
518    * The returned row is invalid until we call copyFrom on it.
519    */
createFromByteArray(int numBytes, int numFields)520   public static UnsafeRow createFromByteArray(int numBytes, int numFields) {
521     final UnsafeRow row = new UnsafeRow(numFields);
522     row.pointTo(new byte[numBytes], numBytes);
523     return row;
524   }
525 
526   /**
527    * Copies the input UnsafeRow to this UnsafeRow, and resize the underlying byte[] when the
528    * input row is larger than this row.
529    */
copyFrom(UnsafeRow row)530   public void copyFrom(UnsafeRow row) {
531     // copyFrom is only available for UnsafeRow created from byte array.
532     assert (baseObject instanceof byte[]) && baseOffset == Platform.BYTE_ARRAY_OFFSET;
533     if (row.sizeInBytes > this.sizeInBytes) {
534       // resize the underlying byte[] if it's not large enough.
535       this.baseObject = new byte[row.sizeInBytes];
536     }
537     Platform.copyMemory(
538       row.baseObject, row.baseOffset, this.baseObject, this.baseOffset, row.sizeInBytes);
539     // update the sizeInBytes.
540     this.sizeInBytes = row.sizeInBytes;
541   }
542 
543   /**
544    * Write this UnsafeRow's underlying bytes to the given OutputStream.
545    *
546    * @param out the stream to write to.
547    * @param writeBuffer a byte array for buffering chunks of off-heap data while writing to the
548    *                    output stream. If this row is backed by an on-heap byte array, then this
549    *                    buffer will not be used and may be null.
550    */
writeToStream(OutputStream out, byte[] writeBuffer)551   public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException {
552     if (baseObject instanceof byte[]) {
553       int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
554       out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
555     } else {
556       int dataRemaining = sizeInBytes;
557       long rowReadPosition = baseOffset;
558       while (dataRemaining > 0) {
559         int toTransfer = Math.min(writeBuffer.length, dataRemaining);
560         Platform.copyMemory(
561           baseObject, rowReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
562         out.write(writeBuffer, 0, toTransfer);
563         rowReadPosition += toTransfer;
564         dataRemaining -= toTransfer;
565       }
566     }
567   }
568 
569   @Override
hashCode()570   public int hashCode() {
571     return Murmur3_x86_32.hashUnsafeWords(baseObject, baseOffset, sizeInBytes, 42);
572   }
573 
574   @Override
equals(Object other)575   public boolean equals(Object other) {
576     if (other instanceof UnsafeRow) {
577       UnsafeRow o = (UnsafeRow) other;
578       return (sizeInBytes == o.sizeInBytes) &&
579         ByteArrayMethods.arrayEquals(baseObject, baseOffset, o.baseObject, o.baseOffset,
580           sizeInBytes);
581     }
582     return false;
583   }
584 
585   /**
586    * Returns the underlying bytes for this UnsafeRow.
587    */
getBytes()588   public byte[] getBytes() {
589     if (baseObject instanceof byte[] && baseOffset == Platform.BYTE_ARRAY_OFFSET
590       && (((byte[]) baseObject).length == sizeInBytes)) {
591       return (byte[]) baseObject;
592     } else {
593       byte[] bytes = new byte[sizeInBytes];
594       Platform.copyMemory(baseObject, baseOffset, bytes, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
595       return bytes;
596     }
597   }
598 
599   // This is for debugging
600   @Override
toString()601   public String toString() {
602     StringBuilder build = new StringBuilder("[");
603     for (int i = 0; i < sizeInBytes; i += 8) {
604       if (i != 0) build.append(',');
605       build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
606     }
607     build.append(']');
608     return build.toString();
609   }
610 
611   @Override
anyNull()612   public boolean anyNull() {
613     return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes / 8);
614   }
615 
616   /**
617    * Writes the content of this row into a memory address, identified by an object and an offset.
618    * The target memory address must already been allocated, and have enough space to hold all the
619    * bytes in this string.
620    */
writeToMemory(Object target, long targetOffset)621   public void writeToMemory(Object target, long targetOffset) {
622     Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
623   }
624 
writeTo(ByteBuffer buffer)625   public void writeTo(ByteBuffer buffer) {
626     assert (buffer.hasArray());
627     byte[] target = buffer.array();
628     int offset = buffer.arrayOffset();
629     int pos = buffer.position();
630     writeToMemory(target, Platform.BYTE_ARRAY_OFFSET + offset + pos);
631     buffer.position(pos + sizeInBytes);
632   }
633 
634   /**
635    * Write the bytes of var-length field into ByteBuffer
636    *
637    * Note: only work with HeapByteBuffer
638    */
writeFieldTo(int ordinal, ByteBuffer buffer)639   public void writeFieldTo(int ordinal, ByteBuffer buffer) {
640     final long offsetAndSize = getLong(ordinal);
641     final int offset = (int) (offsetAndSize >> 32);
642     final int size = (int) offsetAndSize;
643 
644     buffer.putInt(size);
645     int pos = buffer.position();
646     buffer.position(pos + size);
647     Platform.copyMemory(
648       baseObject,
649       baseOffset + offset,
650       buffer.array(),
651       Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + pos,
652       size);
653   }
654 
655   @Override
writeExternal(ObjectOutput out)656   public void writeExternal(ObjectOutput out) throws IOException {
657     byte[] bytes = getBytes();
658     out.writeInt(bytes.length);
659     out.writeInt(this.numFields);
660     out.write(bytes);
661   }
662 
663   @Override
readExternal(ObjectInput in)664   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
665     this.baseOffset = BYTE_ARRAY_OFFSET;
666     this.sizeInBytes = in.readInt();
667     this.numFields = in.readInt();
668     this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
669     this.baseObject = new byte[sizeInBytes];
670     in.readFully((byte[]) baseObject);
671   }
672 
673   @Override
write(Kryo kryo, Output out)674   public void write(Kryo kryo, Output out) {
675     byte[] bytes = getBytes();
676     out.writeInt(bytes.length);
677     out.writeInt(this.numFields);
678     out.write(bytes);
679   }
680 
681   @Override
read(Kryo kryo, Input in)682   public void read(Kryo kryo, Input in) {
683     this.baseOffset = BYTE_ARRAY_OFFSET;
684     this.sizeInBytes = in.readInt();
685     this.numFields = in.readInt();
686     this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
687     this.baseObject = new byte[sizeInBytes];
688     in.read((byte[]) baseObject);
689   }
690 }
691