1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.executor 19 20import org.apache.spark.annotation.DeveloperApi 21import org.apache.spark.util.LongAccumulator 22 23 24/** 25 * :: DeveloperApi :: 26 * A collection of accumulators that represent metrics about writing shuffle data. 27 * Operations are not thread-safe. 28 */ 29@DeveloperApi 30class ShuffleWriteMetrics private[spark] () extends Serializable { 31 private[executor] val _bytesWritten = new LongAccumulator 32 private[executor] val _recordsWritten = new LongAccumulator 33 private[executor] val _writeTime = new LongAccumulator 34 35 /** 36 * Number of bytes written for the shuffle by this task. 37 */ 38 def bytesWritten: Long = _bytesWritten.sum 39 40 /** 41 * Total number of records written to the shuffle by this task. 42 */ 43 def recordsWritten: Long = _recordsWritten.sum 44 45 /** 46 * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds. 47 */ 48 def writeTime: Long = _writeTime.sum 49 50 private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) 51 private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) 52 private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v) 53 private[spark] def decBytesWritten(v: Long): Unit = { 54 _bytesWritten.setValue(bytesWritten - v) 55 } 56 private[spark] def decRecordsWritten(v: Long): Unit = { 57 _recordsWritten.setValue(recordsWritten - v) 58 } 59 60 // Legacy methods for backward compatibility. 61 // TODO: remove these once we make this class private. 62 @deprecated("use bytesWritten instead", "2.0.0") 63 def shuffleBytesWritten: Long = bytesWritten 64 @deprecated("use writeTime instead", "2.0.0") 65 def shuffleWriteTime: Long = writeTime 66 @deprecated("use recordsWritten instead", "2.0.0") 67 def shuffleRecordsWritten: Long = recordsWritten 68 69} 70