1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.spark.sql.catalyst.expressions; 19 20 import java.io.*; 21 import java.math.BigDecimal; 22 import java.math.BigInteger; 23 import java.nio.ByteBuffer; 24 import java.util.Arrays; 25 import java.util.Collections; 26 import java.util.HashSet; 27 import java.util.Set; 28 29 import com.esotericsoftware.kryo.Kryo; 30 import com.esotericsoftware.kryo.KryoSerializable; 31 import com.esotericsoftware.kryo.io.Input; 32 import com.esotericsoftware.kryo.io.Output; 33 34 import org.apache.spark.sql.catalyst.InternalRow; 35 import org.apache.spark.sql.types.*; 36 import org.apache.spark.unsafe.Platform; 37 import org.apache.spark.unsafe.array.ByteArrayMethods; 38 import org.apache.spark.unsafe.bitset.BitSetMethods; 39 import org.apache.spark.unsafe.hash.Murmur3_x86_32; 40 import org.apache.spark.unsafe.types.CalendarInterval; 41 import org.apache.spark.unsafe.types.UTF8String; 42 43 import static org.apache.spark.sql.types.DataTypes.*; 44 import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; 45 46 /** 47 * An Unsafe implementation of Row which is backed by raw memory instead of Java objects. 48 * 49 * Each tuple has three parts: [null bit set] [values] [variable length portion] 50 * 51 * The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores 52 * one bit per field. 53 * 54 * In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length 55 * primitive types, such as long, double, or int, we store the value directly in the word. For 56 * fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the 57 * base address of the row) that points to the beginning of the variable-length field, and length 58 * (they are combined into a long). 59 * 60 * Instances of `UnsafeRow` act as pointers to row data stored in this format. 61 */ 62 public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable { 63 64 ////////////////////////////////////////////////////////////////////////////// 65 // Static methods 66 ////////////////////////////////////////////////////////////////////////////// 67 calculateBitSetWidthInBytes(int numFields)68 public static int calculateBitSetWidthInBytes(int numFields) { 69 return ((numFields + 63)/ 64) * 8; 70 } 71 calculateFixedPortionByteSize(int numFields)72 public static int calculateFixedPortionByteSize(int numFields) { 73 return 8 * numFields + calculateBitSetWidthInBytes(numFields); 74 } 75 76 /** 77 * Field types that can be updated in place in UnsafeRows (e.g. we support set() for these types) 78 */ 79 public static final Set<DataType> mutableFieldTypes; 80 81 // DecimalType is also mutable 82 static { 83 mutableFieldTypes = Collections.unmodifiableSet( 84 new HashSet<>( 85 Arrays.asList(new DataType[] { 86 NullType, 87 BooleanType, 88 ByteType, 89 ShortType, 90 IntegerType, 91 LongType, 92 FloatType, 93 DoubleType, 94 DateType, 95 TimestampType 96 }))); 97 } 98 isFixedLength(DataType dt)99 public static boolean isFixedLength(DataType dt) { 100 if (dt instanceof DecimalType) { 101 return ((DecimalType) dt).precision() <= Decimal.MAX_LONG_DIGITS(); 102 } else { 103 return mutableFieldTypes.contains(dt); 104 } 105 } 106 isMutable(DataType dt)107 public static boolean isMutable(DataType dt) { 108 return mutableFieldTypes.contains(dt) || dt instanceof DecimalType; 109 } 110 111 ////////////////////////////////////////////////////////////////////////////// 112 // Private fields and methods 113 ////////////////////////////////////////////////////////////////////////////// 114 115 private Object baseObject; 116 private long baseOffset; 117 118 /** The number of fields in this row, used for calculating the bitset width (and in assertions) */ 119 private int numFields; 120 121 /** The size of this row's backing data, in bytes) */ 122 private int sizeInBytes; 123 124 /** The width of the null tracking bit set, in bytes */ 125 private int bitSetWidthInBytes; 126 getFieldOffset(int ordinal)127 private long getFieldOffset(int ordinal) { 128 return baseOffset + bitSetWidthInBytes + ordinal * 8L; 129 } 130 assertIndexIsValid(int index)131 private void assertIndexIsValid(int index) { 132 assert index >= 0 : "index (" + index + ") should >= 0"; 133 assert index < numFields : "index (" + index + ") should < " + numFields; 134 } 135 136 ////////////////////////////////////////////////////////////////////////////// 137 // Public methods 138 ////////////////////////////////////////////////////////////////////////////// 139 140 /** 141 * Construct a new UnsafeRow. The resulting row won't be usable until `pointTo()` has been called, 142 * since the value returned by this constructor is equivalent to a null pointer. 143 * 144 * @param numFields the number of fields in this row 145 */ 146 public UnsafeRow(int numFields) { 147 this.numFields = numFields; 148 this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); 149 } 150 151 // for serializer 152 public UnsafeRow() {} 153 154 public Object getBaseObject() { return baseObject; } 155 public long getBaseOffset() { return baseOffset; } 156 public int getSizeInBytes() { return sizeInBytes; } 157 158 @Override 159 public int numFields() { return numFields; } 160 161 /** 162 * Update this UnsafeRow to point to different backing data. 163 * 164 * @param baseObject the base object 165 * @param baseOffset the offset within the base object 166 * @param sizeInBytes the size of this row's backing data, in bytes 167 */ 168 public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) { 169 assert numFields >= 0 : "numFields (" + numFields + ") should >= 0"; 170 this.baseObject = baseObject; 171 this.baseOffset = baseOffset; 172 this.sizeInBytes = sizeInBytes; 173 } 174 175 /** 176 * Update this UnsafeRow to point to the underlying byte array. 177 * 178 * @param buf byte array to point to 179 * @param sizeInBytes the number of bytes valid in the byte array 180 */ pointTo(byte[] buf, int sizeInBytes)181 public void pointTo(byte[] buf, int sizeInBytes) { 182 pointTo(buf, Platform.BYTE_ARRAY_OFFSET, sizeInBytes); 183 } 184 setTotalSize(int sizeInBytes)185 public void setTotalSize(int sizeInBytes) { 186 this.sizeInBytes = sizeInBytes; 187 } 188 setNotNullAt(int i)189 public void setNotNullAt(int i) { 190 assertIndexIsValid(i); 191 BitSetMethods.unset(baseObject, baseOffset, i); 192 } 193 194 @Override setNullAt(int i)195 public void setNullAt(int i) { 196 assertIndexIsValid(i); 197 BitSetMethods.set(baseObject, baseOffset, i); 198 // To preserve row equality, zero out the value when setting the column to null. 199 // Since this row does does not currently support updates to variable-length values, we don't 200 // have to worry about zeroing out that data. 201 Platform.putLong(baseObject, getFieldOffset(i), 0); 202 } 203 204 @Override update(int ordinal, Object value)205 public void update(int ordinal, Object value) { 206 throw new UnsupportedOperationException(); 207 } 208 209 @Override setInt(int ordinal, int value)210 public void setInt(int ordinal, int value) { 211 assertIndexIsValid(ordinal); 212 setNotNullAt(ordinal); 213 Platform.putInt(baseObject, getFieldOffset(ordinal), value); 214 } 215 216 @Override setLong(int ordinal, long value)217 public void setLong(int ordinal, long value) { 218 assertIndexIsValid(ordinal); 219 setNotNullAt(ordinal); 220 Platform.putLong(baseObject, getFieldOffset(ordinal), value); 221 } 222 223 @Override setDouble(int ordinal, double value)224 public void setDouble(int ordinal, double value) { 225 assertIndexIsValid(ordinal); 226 setNotNullAt(ordinal); 227 if (Double.isNaN(value)) { 228 value = Double.NaN; 229 } 230 Platform.putDouble(baseObject, getFieldOffset(ordinal), value); 231 } 232 233 @Override setBoolean(int ordinal, boolean value)234 public void setBoolean(int ordinal, boolean value) { 235 assertIndexIsValid(ordinal); 236 setNotNullAt(ordinal); 237 Platform.putBoolean(baseObject, getFieldOffset(ordinal), value); 238 } 239 240 @Override setShort(int ordinal, short value)241 public void setShort(int ordinal, short value) { 242 assertIndexIsValid(ordinal); 243 setNotNullAt(ordinal); 244 Platform.putShort(baseObject, getFieldOffset(ordinal), value); 245 } 246 247 @Override setByte(int ordinal, byte value)248 public void setByte(int ordinal, byte value) { 249 assertIndexIsValid(ordinal); 250 setNotNullAt(ordinal); 251 Platform.putByte(baseObject, getFieldOffset(ordinal), value); 252 } 253 254 @Override setFloat(int ordinal, float value)255 public void setFloat(int ordinal, float value) { 256 assertIndexIsValid(ordinal); 257 setNotNullAt(ordinal); 258 if (Float.isNaN(value)) { 259 value = Float.NaN; 260 } 261 Platform.putFloat(baseObject, getFieldOffset(ordinal), value); 262 } 263 264 /** 265 * Updates the decimal column. 266 * 267 * Note: In order to support update a decimal with precision > 18, CAN NOT call 268 * setNullAt() for this column. 269 */ 270 @Override setDecimal(int ordinal, Decimal value, int precision)271 public void setDecimal(int ordinal, Decimal value, int precision) { 272 assertIndexIsValid(ordinal); 273 if (precision <= Decimal.MAX_LONG_DIGITS()) { 274 // compact format 275 if (value == null) { 276 setNullAt(ordinal); 277 } else { 278 setLong(ordinal, value.toUnscaledLong()); 279 } 280 } else { 281 // fixed length 282 long cursor = getLong(ordinal) >>> 32; 283 assert cursor > 0 : "invalid cursor " + cursor; 284 // zero-out the bytes 285 Platform.putLong(baseObject, baseOffset + cursor, 0L); 286 Platform.putLong(baseObject, baseOffset + cursor + 8, 0L); 287 288 if (value == null) { 289 setNullAt(ordinal); 290 // keep the offset for future update 291 Platform.putLong(baseObject, getFieldOffset(ordinal), cursor << 32); 292 } else { 293 294 final BigInteger integer = value.toJavaBigDecimal().unscaledValue(); 295 byte[] bytes = integer.toByteArray(); 296 assert(bytes.length <= 16); 297 298 // Write the bytes to the variable length portion. 299 Platform.copyMemory( 300 bytes, Platform.BYTE_ARRAY_OFFSET, baseObject, baseOffset + cursor, bytes.length); 301 setLong(ordinal, (cursor << 32) | ((long) bytes.length)); 302 } 303 } 304 } 305 306 @Override get(int ordinal, DataType dataType)307 public Object get(int ordinal, DataType dataType) { 308 if (isNullAt(ordinal) || dataType instanceof NullType) { 309 return null; 310 } else if (dataType instanceof BooleanType) { 311 return getBoolean(ordinal); 312 } else if (dataType instanceof ByteType) { 313 return getByte(ordinal); 314 } else if (dataType instanceof ShortType) { 315 return getShort(ordinal); 316 } else if (dataType instanceof IntegerType) { 317 return getInt(ordinal); 318 } else if (dataType instanceof LongType) { 319 return getLong(ordinal); 320 } else if (dataType instanceof FloatType) { 321 return getFloat(ordinal); 322 } else if (dataType instanceof DoubleType) { 323 return getDouble(ordinal); 324 } else if (dataType instanceof DecimalType) { 325 DecimalType dt = (DecimalType) dataType; 326 return getDecimal(ordinal, dt.precision(), dt.scale()); 327 } else if (dataType instanceof DateType) { 328 return getInt(ordinal); 329 } else if (dataType instanceof TimestampType) { 330 return getLong(ordinal); 331 } else if (dataType instanceof BinaryType) { 332 return getBinary(ordinal); 333 } else if (dataType instanceof StringType) { 334 return getUTF8String(ordinal); 335 } else if (dataType instanceof CalendarIntervalType) { 336 return getInterval(ordinal); 337 } else if (dataType instanceof StructType) { 338 return getStruct(ordinal, ((StructType) dataType).size()); 339 } else if (dataType instanceof ArrayType) { 340 return getArray(ordinal); 341 } else if (dataType instanceof MapType) { 342 return getMap(ordinal); 343 } else if (dataType instanceof UserDefinedType) { 344 return get(ordinal, ((UserDefinedType)dataType).sqlType()); 345 } else { 346 throw new UnsupportedOperationException("Unsupported data type " + dataType.simpleString()); 347 } 348 } 349 350 @Override isNullAt(int ordinal)351 public boolean isNullAt(int ordinal) { 352 assertIndexIsValid(ordinal); 353 return BitSetMethods.isSet(baseObject, baseOffset, ordinal); 354 } 355 356 @Override getBoolean(int ordinal)357 public boolean getBoolean(int ordinal) { 358 assertIndexIsValid(ordinal); 359 return Platform.getBoolean(baseObject, getFieldOffset(ordinal)); 360 } 361 362 @Override getByte(int ordinal)363 public byte getByte(int ordinal) { 364 assertIndexIsValid(ordinal); 365 return Platform.getByte(baseObject, getFieldOffset(ordinal)); 366 } 367 368 @Override getShort(int ordinal)369 public short getShort(int ordinal) { 370 assertIndexIsValid(ordinal); 371 return Platform.getShort(baseObject, getFieldOffset(ordinal)); 372 } 373 374 @Override getInt(int ordinal)375 public int getInt(int ordinal) { 376 assertIndexIsValid(ordinal); 377 return Platform.getInt(baseObject, getFieldOffset(ordinal)); 378 } 379 380 @Override getLong(int ordinal)381 public long getLong(int ordinal) { 382 assertIndexIsValid(ordinal); 383 return Platform.getLong(baseObject, getFieldOffset(ordinal)); 384 } 385 386 @Override getFloat(int ordinal)387 public float getFloat(int ordinal) { 388 assertIndexIsValid(ordinal); 389 return Platform.getFloat(baseObject, getFieldOffset(ordinal)); 390 } 391 392 @Override getDouble(int ordinal)393 public double getDouble(int ordinal) { 394 assertIndexIsValid(ordinal); 395 return Platform.getDouble(baseObject, getFieldOffset(ordinal)); 396 } 397 398 @Override getDecimal(int ordinal, int precision, int scale)399 public Decimal getDecimal(int ordinal, int precision, int scale) { 400 if (isNullAt(ordinal)) { 401 return null; 402 } 403 if (precision <= Decimal.MAX_LONG_DIGITS()) { 404 return Decimal.createUnsafe(getLong(ordinal), precision, scale); 405 } else { 406 byte[] bytes = getBinary(ordinal); 407 BigInteger bigInteger = new BigInteger(bytes); 408 BigDecimal javaDecimal = new BigDecimal(bigInteger, scale); 409 return Decimal.apply(javaDecimal, precision, scale); 410 } 411 } 412 413 @Override getUTF8String(int ordinal)414 public UTF8String getUTF8String(int ordinal) { 415 if (isNullAt(ordinal)) return null; 416 final long offsetAndSize = getLong(ordinal); 417 final int offset = (int) (offsetAndSize >> 32); 418 final int size = (int) offsetAndSize; 419 return UTF8String.fromAddress(baseObject, baseOffset + offset, size); 420 } 421 422 @Override getBinary(int ordinal)423 public byte[] getBinary(int ordinal) { 424 if (isNullAt(ordinal)) { 425 return null; 426 } else { 427 final long offsetAndSize = getLong(ordinal); 428 final int offset = (int) (offsetAndSize >> 32); 429 final int size = (int) offsetAndSize; 430 final byte[] bytes = new byte[size]; 431 Platform.copyMemory( 432 baseObject, 433 baseOffset + offset, 434 bytes, 435 Platform.BYTE_ARRAY_OFFSET, 436 size 437 ); 438 return bytes; 439 } 440 } 441 442 @Override getInterval(int ordinal)443 public CalendarInterval getInterval(int ordinal) { 444 if (isNullAt(ordinal)) { 445 return null; 446 } else { 447 final long offsetAndSize = getLong(ordinal); 448 final int offset = (int) (offsetAndSize >> 32); 449 final int months = (int) Platform.getLong(baseObject, baseOffset + offset); 450 final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8); 451 return new CalendarInterval(months, microseconds); 452 } 453 } 454 455 @Override getStruct(int ordinal, int numFields)456 public UnsafeRow getStruct(int ordinal, int numFields) { 457 if (isNullAt(ordinal)) { 458 return null; 459 } else { 460 final long offsetAndSize = getLong(ordinal); 461 final int offset = (int) (offsetAndSize >> 32); 462 final int size = (int) offsetAndSize; 463 final UnsafeRow row = new UnsafeRow(numFields); 464 row.pointTo(baseObject, baseOffset + offset, size); 465 return row; 466 } 467 } 468 469 @Override getArray(int ordinal)470 public UnsafeArrayData getArray(int ordinal) { 471 if (isNullAt(ordinal)) { 472 return null; 473 } else { 474 final long offsetAndSize = getLong(ordinal); 475 final int offset = (int) (offsetAndSize >> 32); 476 final int size = (int) offsetAndSize; 477 final UnsafeArrayData array = new UnsafeArrayData(); 478 array.pointTo(baseObject, baseOffset + offset, size); 479 return array; 480 } 481 } 482 483 @Override getMap(int ordinal)484 public UnsafeMapData getMap(int ordinal) { 485 if (isNullAt(ordinal)) { 486 return null; 487 } else { 488 final long offsetAndSize = getLong(ordinal); 489 final int offset = (int) (offsetAndSize >> 32); 490 final int size = (int) offsetAndSize; 491 final UnsafeMapData map = new UnsafeMapData(); 492 map.pointTo(baseObject, baseOffset + offset, size); 493 return map; 494 } 495 } 496 497 /** 498 * Copies this row, returning a self-contained UnsafeRow that stores its data in an internal 499 * byte array rather than referencing data stored in a data page. 500 */ 501 @Override copy()502 public UnsafeRow copy() { 503 UnsafeRow rowCopy = new UnsafeRow(numFields); 504 final byte[] rowDataCopy = new byte[sizeInBytes]; 505 Platform.copyMemory( 506 baseObject, 507 baseOffset, 508 rowDataCopy, 509 Platform.BYTE_ARRAY_OFFSET, 510 sizeInBytes 511 ); 512 rowCopy.pointTo(rowDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes); 513 return rowCopy; 514 } 515 516 /** 517 * Creates an empty UnsafeRow from a byte array with specified numBytes and numFields. 518 * The returned row is invalid until we call copyFrom on it. 519 */ createFromByteArray(int numBytes, int numFields)520 public static UnsafeRow createFromByteArray(int numBytes, int numFields) { 521 final UnsafeRow row = new UnsafeRow(numFields); 522 row.pointTo(new byte[numBytes], numBytes); 523 return row; 524 } 525 526 /** 527 * Copies the input UnsafeRow to this UnsafeRow, and resize the underlying byte[] when the 528 * input row is larger than this row. 529 */ copyFrom(UnsafeRow row)530 public void copyFrom(UnsafeRow row) { 531 // copyFrom is only available for UnsafeRow created from byte array. 532 assert (baseObject instanceof byte[]) && baseOffset == Platform.BYTE_ARRAY_OFFSET; 533 if (row.sizeInBytes > this.sizeInBytes) { 534 // resize the underlying byte[] if it's not large enough. 535 this.baseObject = new byte[row.sizeInBytes]; 536 } 537 Platform.copyMemory( 538 row.baseObject, row.baseOffset, this.baseObject, this.baseOffset, row.sizeInBytes); 539 // update the sizeInBytes. 540 this.sizeInBytes = row.sizeInBytes; 541 } 542 543 /** 544 * Write this UnsafeRow's underlying bytes to the given OutputStream. 545 * 546 * @param out the stream to write to. 547 * @param writeBuffer a byte array for buffering chunks of off-heap data while writing to the 548 * output stream. If this row is backed by an on-heap byte array, then this 549 * buffer will not be used and may be null. 550 */ writeToStream(OutputStream out, byte[] writeBuffer)551 public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException { 552 if (baseObject instanceof byte[]) { 553 int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset); 554 out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes); 555 } else { 556 int dataRemaining = sizeInBytes; 557 long rowReadPosition = baseOffset; 558 while (dataRemaining > 0) { 559 int toTransfer = Math.min(writeBuffer.length, dataRemaining); 560 Platform.copyMemory( 561 baseObject, rowReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); 562 out.write(writeBuffer, 0, toTransfer); 563 rowReadPosition += toTransfer; 564 dataRemaining -= toTransfer; 565 } 566 } 567 } 568 569 @Override hashCode()570 public int hashCode() { 571 return Murmur3_x86_32.hashUnsafeWords(baseObject, baseOffset, sizeInBytes, 42); 572 } 573 574 @Override equals(Object other)575 public boolean equals(Object other) { 576 if (other instanceof UnsafeRow) { 577 UnsafeRow o = (UnsafeRow) other; 578 return (sizeInBytes == o.sizeInBytes) && 579 ByteArrayMethods.arrayEquals(baseObject, baseOffset, o.baseObject, o.baseOffset, 580 sizeInBytes); 581 } 582 return false; 583 } 584 585 /** 586 * Returns the underlying bytes for this UnsafeRow. 587 */ getBytes()588 public byte[] getBytes() { 589 if (baseObject instanceof byte[] && baseOffset == Platform.BYTE_ARRAY_OFFSET 590 && (((byte[]) baseObject).length == sizeInBytes)) { 591 return (byte[]) baseObject; 592 } else { 593 byte[] bytes = new byte[sizeInBytes]; 594 Platform.copyMemory(baseObject, baseOffset, bytes, Platform.BYTE_ARRAY_OFFSET, sizeInBytes); 595 return bytes; 596 } 597 } 598 599 // This is for debugging 600 @Override toString()601 public String toString() { 602 StringBuilder build = new StringBuilder("["); 603 for (int i = 0; i < sizeInBytes; i += 8) { 604 if (i != 0) build.append(','); 605 build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); 606 } 607 build.append(']'); 608 return build.toString(); 609 } 610 611 @Override anyNull()612 public boolean anyNull() { 613 return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes / 8); 614 } 615 616 /** 617 * Writes the content of this row into a memory address, identified by an object and an offset. 618 * The target memory address must already been allocated, and have enough space to hold all the 619 * bytes in this string. 620 */ writeToMemory(Object target, long targetOffset)621 public void writeToMemory(Object target, long targetOffset) { 622 Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes); 623 } 624 writeTo(ByteBuffer buffer)625 public void writeTo(ByteBuffer buffer) { 626 assert (buffer.hasArray()); 627 byte[] target = buffer.array(); 628 int offset = buffer.arrayOffset(); 629 int pos = buffer.position(); 630 writeToMemory(target, Platform.BYTE_ARRAY_OFFSET + offset + pos); 631 buffer.position(pos + sizeInBytes); 632 } 633 634 /** 635 * Write the bytes of var-length field into ByteBuffer 636 * 637 * Note: only work with HeapByteBuffer 638 */ writeFieldTo(int ordinal, ByteBuffer buffer)639 public void writeFieldTo(int ordinal, ByteBuffer buffer) { 640 final long offsetAndSize = getLong(ordinal); 641 final int offset = (int) (offsetAndSize >> 32); 642 final int size = (int) offsetAndSize; 643 644 buffer.putInt(size); 645 int pos = buffer.position(); 646 buffer.position(pos + size); 647 Platform.copyMemory( 648 baseObject, 649 baseOffset + offset, 650 buffer.array(), 651 Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + pos, 652 size); 653 } 654 655 @Override writeExternal(ObjectOutput out)656 public void writeExternal(ObjectOutput out) throws IOException { 657 byte[] bytes = getBytes(); 658 out.writeInt(bytes.length); 659 out.writeInt(this.numFields); 660 out.write(bytes); 661 } 662 663 @Override readExternal(ObjectInput in)664 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 665 this.baseOffset = BYTE_ARRAY_OFFSET; 666 this.sizeInBytes = in.readInt(); 667 this.numFields = in.readInt(); 668 this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); 669 this.baseObject = new byte[sizeInBytes]; 670 in.readFully((byte[]) baseObject); 671 } 672 673 @Override write(Kryo kryo, Output out)674 public void write(Kryo kryo, Output out) { 675 byte[] bytes = getBytes(); 676 out.writeInt(bytes.length); 677 out.writeInt(this.numFields); 678 out.write(bytes); 679 } 680 681 @Override read(Kryo kryo, Input in)682 public void read(Kryo kryo, Input in) { 683 this.baseOffset = BYTE_ARRAY_OFFSET; 684 this.sizeInBytes = in.readInt(); 685 this.numFields = in.readInt(); 686 this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); 687 this.baseObject = new byte[sizeInBytes]; 688 in.read((byte[]) baseObject); 689 } 690 } 691