1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.util.io
19
20import java.io.OutputStream
21import java.nio.ByteBuffer
22
23import scala.collection.mutable.ArrayBuffer
24
25import org.apache.spark.storage.StorageUtils
26
27/**
28 * An OutputStream that writes to fixed-size chunks of byte arrays.
29 *
30 * @param chunkSize size of each chunk, in bytes.
31 */
32private[spark] class ChunkedByteBufferOutputStream(
33    chunkSize: Int,
34    allocator: Int => ByteBuffer)
35  extends OutputStream {
36
37  private[this] var toChunkedByteBufferWasCalled = false
38
39  private val chunks = new ArrayBuffer[ByteBuffer]
40
41  /** Index of the last chunk. Starting with -1 when the chunks array is empty. */
42  private[this] var lastChunkIndex = -1
43
44  /**
45   * Next position to write in the last chunk.
46   *
47   * If this equals chunkSize, it means for next write we need to allocate a new chunk.
48   * This can also never be 0.
49   */
50  private[this] var position = chunkSize
51  private[this] var _size = 0
52  private[this] var closed: Boolean = false
53
54  def size: Long = _size
55
56  override def close(): Unit = {
57    if (!closed) {
58      super.close()
59      closed = true
60    }
61  }
62
63  override def write(b: Int): Unit = {
64    require(!closed, "cannot write to a closed ChunkedByteBufferOutputStream")
65    allocateNewChunkIfNeeded()
66    chunks(lastChunkIndex).put(b.toByte)
67    position += 1
68    _size += 1
69  }
70
71  override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
72    require(!closed, "cannot write to a closed ChunkedByteBufferOutputStream")
73    var written = 0
74    while (written < len) {
75      allocateNewChunkIfNeeded()
76      val thisBatch = math.min(chunkSize - position, len - written)
77      chunks(lastChunkIndex).put(bytes, written + off, thisBatch)
78      written += thisBatch
79      position += thisBatch
80    }
81    _size += len
82  }
83
84  @inline
85  private def allocateNewChunkIfNeeded(): Unit = {
86    if (position == chunkSize) {
87      chunks += allocator(chunkSize)
88      lastChunkIndex += 1
89      position = 0
90    }
91  }
92
93  def toChunkedByteBuffer: ChunkedByteBuffer = {
94    require(closed, "cannot call toChunkedByteBuffer() unless close() has been called")
95    require(!toChunkedByteBufferWasCalled, "toChunkedByteBuffer() can only be called once")
96    toChunkedByteBufferWasCalled = true
97    if (lastChunkIndex == -1) {
98      new ChunkedByteBuffer(Array.empty[ByteBuffer])
99    } else {
100      // Copy the first n-1 chunks to the output, and then create an array that fits the last chunk.
101      // An alternative would have been returning an array of ByteBuffers, with the last buffer
102      // bounded to only the last chunk's position. However, given our use case in Spark (to put
103      // the chunks in block manager), only limiting the view bound of the buffer would still
104      // require the block manager to store the whole chunk.
105      val ret = new Array[ByteBuffer](chunks.size)
106      for (i <- 0 until chunks.size - 1) {
107        ret(i) = chunks(i)
108        ret(i).flip()
109      }
110      if (position == chunkSize) {
111        ret(lastChunkIndex) = chunks(lastChunkIndex)
112        ret(lastChunkIndex).flip()
113      } else {
114        ret(lastChunkIndex) = allocator(position)
115        chunks(lastChunkIndex).flip()
116        ret(lastChunkIndex).put(chunks(lastChunkIndex))
117        ret(lastChunkIndex).flip()
118        StorageUtils.dispose(chunks(lastChunkIndex))
119      }
120      new ChunkedByteBuffer(ret)
121    }
122  }
123}
124