1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.util.io 19 20import java.io.OutputStream 21import java.nio.ByteBuffer 22 23import scala.collection.mutable.ArrayBuffer 24 25import org.apache.spark.storage.StorageUtils 26 27/** 28 * An OutputStream that writes to fixed-size chunks of byte arrays. 29 * 30 * @param chunkSize size of each chunk, in bytes. 31 */ 32private[spark] class ChunkedByteBufferOutputStream( 33 chunkSize: Int, 34 allocator: Int => ByteBuffer) 35 extends OutputStream { 36 37 private[this] var toChunkedByteBufferWasCalled = false 38 39 private val chunks = new ArrayBuffer[ByteBuffer] 40 41 /** Index of the last chunk. Starting with -1 when the chunks array is empty. */ 42 private[this] var lastChunkIndex = -1 43 44 /** 45 * Next position to write in the last chunk. 46 * 47 * If this equals chunkSize, it means for next write we need to allocate a new chunk. 48 * This can also never be 0. 49 */ 50 private[this] var position = chunkSize 51 private[this] var _size = 0 52 private[this] var closed: Boolean = false 53 54 def size: Long = _size 55 56 override def close(): Unit = { 57 if (!closed) { 58 super.close() 59 closed = true 60 } 61 } 62 63 override def write(b: Int): Unit = { 64 require(!closed, "cannot write to a closed ChunkedByteBufferOutputStream") 65 allocateNewChunkIfNeeded() 66 chunks(lastChunkIndex).put(b.toByte) 67 position += 1 68 _size += 1 69 } 70 71 override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { 72 require(!closed, "cannot write to a closed ChunkedByteBufferOutputStream") 73 var written = 0 74 while (written < len) { 75 allocateNewChunkIfNeeded() 76 val thisBatch = math.min(chunkSize - position, len - written) 77 chunks(lastChunkIndex).put(bytes, written + off, thisBatch) 78 written += thisBatch 79 position += thisBatch 80 } 81 _size += len 82 } 83 84 @inline 85 private def allocateNewChunkIfNeeded(): Unit = { 86 if (position == chunkSize) { 87 chunks += allocator(chunkSize) 88 lastChunkIndex += 1 89 position = 0 90 } 91 } 92 93 def toChunkedByteBuffer: ChunkedByteBuffer = { 94 require(closed, "cannot call toChunkedByteBuffer() unless close() has been called") 95 require(!toChunkedByteBufferWasCalled, "toChunkedByteBuffer() can only be called once") 96 toChunkedByteBufferWasCalled = true 97 if (lastChunkIndex == -1) { 98 new ChunkedByteBuffer(Array.empty[ByteBuffer]) 99 } else { 100 // Copy the first n-1 chunks to the output, and then create an array that fits the last chunk. 101 // An alternative would have been returning an array of ByteBuffers, with the last buffer 102 // bounded to only the last chunk's position. However, given our use case in Spark (to put 103 // the chunks in block manager), only limiting the view bound of the buffer would still 104 // require the block manager to store the whole chunk. 105 val ret = new Array[ByteBuffer](chunks.size) 106 for (i <- 0 until chunks.size - 1) { 107 ret(i) = chunks(i) 108 ret(i).flip() 109 } 110 if (position == chunkSize) { 111 ret(lastChunkIndex) = chunks(lastChunkIndex) 112 ret(lastChunkIndex).flip() 113 } else { 114 ret(lastChunkIndex) = allocator(position) 115 chunks(lastChunkIndex).flip() 116 ret(lastChunkIndex).put(chunks(lastChunkIndex)) 117 ret(lastChunkIndex).flip() 118 StorageUtils.dispose(chunks(lastChunkIndex)) 119 } 120 new ChunkedByteBuffer(ret) 121 } 122 } 123} 124