1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.util
19
20import java.io.InputStream
21import java.nio.ByteBuffer
22
23import org.apache.spark.storage.StorageUtils
24
25/**
26 * Reads data from a ByteBuffer.
27 */
28private[spark]
29class ByteBufferInputStream(private var buffer: ByteBuffer)
30  extends InputStream {
31
32  override def read(): Int = {
33    if (buffer == null || buffer.remaining() == 0) {
34      cleanUp()
35      -1
36    } else {
37      buffer.get() & 0xFF
38    }
39  }
40
41  override def read(dest: Array[Byte]): Int = {
42    read(dest, 0, dest.length)
43  }
44
45  override def read(dest: Array[Byte], offset: Int, length: Int): Int = {
46    if (buffer == null || buffer.remaining() == 0) {
47      cleanUp()
48      -1
49    } else {
50      val amountToGet = math.min(buffer.remaining(), length)
51      buffer.get(dest, offset, amountToGet)
52      amountToGet
53    }
54  }
55
56  override def skip(bytes: Long): Long = {
57    if (buffer != null) {
58      val amountToSkip = math.min(bytes, buffer.remaining).toInt
59      buffer.position(buffer.position + amountToSkip)
60      if (buffer.remaining() == 0) {
61        cleanUp()
62      }
63      amountToSkip
64    } else {
65      0L
66    }
67  }
68
69  /**
70   * Clean up the buffer, and potentially dispose of it using StorageUtils.dispose().
71   */
72  private def cleanUp() {
73    if (buffer != null) {
74      buffer = null
75    }
76  }
77}
78