1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.executor
19
20import org.apache.spark.annotation.DeveloperApi
21import org.apache.spark.util.LongAccumulator
22
23
24/**
25 * :: DeveloperApi ::
26 * A collection of accumulators that represent metrics about writing shuffle data.
27 * Operations are not thread-safe.
28 */
29@DeveloperApi
30class ShuffleWriteMetrics private[spark] () extends Serializable {
31  private[executor] val _bytesWritten = new LongAccumulator
32  private[executor] val _recordsWritten = new LongAccumulator
33  private[executor] val _writeTime = new LongAccumulator
34
35  /**
36   * Number of bytes written for the shuffle by this task.
37   */
38  def bytesWritten: Long = _bytesWritten.sum
39
40  /**
41   * Total number of records written to the shuffle by this task.
42   */
43  def recordsWritten: Long = _recordsWritten.sum
44
45  /**
46   * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds.
47   */
48  def writeTime: Long = _writeTime.sum
49
50  private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v)
51  private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v)
52  private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v)
53  private[spark] def decBytesWritten(v: Long): Unit = {
54    _bytesWritten.setValue(bytesWritten - v)
55  }
56  private[spark] def decRecordsWritten(v: Long): Unit = {
57    _recordsWritten.setValue(recordsWritten - v)
58  }
59
60  // Legacy methods for backward compatibility.
61  // TODO: remove these once we make this class private.
62  @deprecated("use bytesWritten instead", "2.0.0")
63  def shuffleBytesWritten: Long = bytesWritten
64  @deprecated("use writeTime instead", "2.0.0")
65  def shuffleWriteTime: Long = writeTime
66  @deprecated("use recordsWritten instead", "2.0.0")
67  def shuffleRecordsWritten: Long = recordsWritten
68
69}
70