1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.util 19 20import java.io.InputStream 21import java.nio.ByteBuffer 22 23import org.apache.spark.storage.StorageUtils 24 25/** 26 * Reads data from a ByteBuffer. 27 */ 28private[spark] 29class ByteBufferInputStream(private var buffer: ByteBuffer) 30 extends InputStream { 31 32 override def read(): Int = { 33 if (buffer == null || buffer.remaining() == 0) { 34 cleanUp() 35 -1 36 } else { 37 buffer.get() & 0xFF 38 } 39 } 40 41 override def read(dest: Array[Byte]): Int = { 42 read(dest, 0, dest.length) 43 } 44 45 override def read(dest: Array[Byte], offset: Int, length: Int): Int = { 46 if (buffer == null || buffer.remaining() == 0) { 47 cleanUp() 48 -1 49 } else { 50 val amountToGet = math.min(buffer.remaining(), length) 51 buffer.get(dest, offset, amountToGet) 52 amountToGet 53 } 54 } 55 56 override def skip(bytes: Long): Long = { 57 if (buffer != null) { 58 val amountToSkip = math.min(bytes, buffer.remaining).toInt 59 buffer.position(buffer.position + amountToSkip) 60 if (buffer.remaining() == 0) { 61 cleanUp() 62 } 63 amountToSkip 64 } else { 65 0L 66 } 67 } 68 69 /** 70 * Clean up the buffer, and potentially dispose of it using StorageUtils.dispose(). 71 */ 72 private def cleanUp() { 73 if (buffer != null) { 74 buffer = null 75 } 76 } 77} 78