1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.mapreduce.task.reduce;
19 
20 import org.apache.hadoop.classification.InterfaceAudience;
21 import org.apache.hadoop.classification.InterfaceStability;
22 
23 import org.apache.hadoop.mapred.JobConf;
24 import org.apache.hadoop.mapreduce.MRJobConfig;
25 import org.apache.hadoop.mapreduce.TaskAttemptID;
26 import org.apache.hadoop.metrics.MetricsContext;
27 import org.apache.hadoop.metrics.MetricsRecord;
28 import org.apache.hadoop.metrics.MetricsUtil;
29 import org.apache.hadoop.metrics.Updater;
30 
31 @InterfaceAudience.LimitedPrivate({"MapReduce"})
32 @InterfaceStability.Unstable
33 public class ShuffleClientMetrics implements Updater {
34 
35   private MetricsRecord shuffleMetrics = null;
36   private int numFailedFetches = 0;
37   private int numSuccessFetches = 0;
38   private long numBytes = 0;
39   private int numThreadsBusy = 0;
40   private final int numCopiers;
41 
ShuffleClientMetrics(TaskAttemptID reduceId, JobConf jobConf)42   ShuffleClientMetrics(TaskAttemptID reduceId, JobConf jobConf) {
43     this.numCopiers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
44 
45     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
46     this.shuffleMetrics =
47       MetricsUtil.createRecord(metricsContext, "shuffleInput");
48     this.shuffleMetrics.setTag("user", jobConf.getUser());
49     this.shuffleMetrics.setTag("jobName", jobConf.getJobName());
50     this.shuffleMetrics.setTag("jobId", reduceId.getJobID().toString());
51     this.shuffleMetrics.setTag("taskId", reduceId.toString());
52     this.shuffleMetrics.setTag("sessionId", jobConf.getSessionId());
53     metricsContext.registerUpdater(this);
54   }
inputBytes(long numBytes)55   public synchronized void inputBytes(long numBytes) {
56     this.numBytes += numBytes;
57   }
failedFetch()58   public synchronized void failedFetch() {
59     ++numFailedFetches;
60   }
successFetch()61   public synchronized void successFetch() {
62     ++numSuccessFetches;
63   }
threadBusy()64   public synchronized void threadBusy() {
65     ++numThreadsBusy;
66   }
threadFree()67   public synchronized void threadFree() {
68     --numThreadsBusy;
69   }
doUpdates(MetricsContext unused)70   public void doUpdates(MetricsContext unused) {
71     synchronized (this) {
72       shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
73       shuffleMetrics.incrMetric("shuffle_failed_fetches",
74                                 numFailedFetches);
75       shuffleMetrics.incrMetric("shuffle_success_fetches",
76                                 numSuccessFetches);
77       if (numCopiers != 0) {
78         shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
79             100*((float)numThreadsBusy/numCopiers));
80       } else {
81         shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
82       }
83       numBytes = 0;
84       numSuccessFetches = 0;
85       numFailedFetches = 0;
86     }
87     shuffleMetrics.update();
88   }
89 }
90