1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.mapred.gridmix;
19 
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 
24 import org.apache.hadoop.io.WritableUtils;
25 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
26 
27 class GridmixSplit extends CombineFileSplit {
28   private int id;
29   private int nSpec;
30   private int maps;
31   private int reduces;
32   private long inputRecords;
33   private long outputBytes;
34   private long outputRecords;
35   private long maxMemory;
36   private double[] reduceBytes = new double[0];
37   private double[] reduceRecords = new double[0];
38 
39   // Spec for reduces id mod this
40   private long[] reduceOutputBytes = new long[0];
41   private long[] reduceOutputRecords = new long[0];
42 
GridmixSplit()43   GridmixSplit() {
44     super();
45   }
46 
GridmixSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes, long inputRecords, long outputBytes, long outputRecords, double[] reduceBytes, double[] reduceRecords, long[] reduceOutputBytes, long[] reduceOutputRecords)47   public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
48       long inputBytes, long inputRecords, long outputBytes,
49       long outputRecords, double[] reduceBytes, double[] reduceRecords,
50       long[] reduceOutputBytes, long[] reduceOutputRecords)
51       throws IOException {
52     super(cfsplit);
53     this.id = id;
54     this.maps = maps;
55     reduces = reduceBytes.length;
56     this.inputRecords = inputRecords;
57     this.outputBytes = outputBytes;
58     this.outputRecords = outputRecords;
59     this.reduceBytes = reduceBytes;
60     this.reduceRecords = reduceRecords;
61     nSpec = reduceOutputBytes.length;
62     this.reduceOutputBytes = reduceOutputBytes;
63     this.reduceOutputRecords = reduceOutputRecords;
64   }
getId()65   public int getId() {
66     return id;
67   }
getMapCount()68   public int getMapCount() {
69     return maps;
70   }
getInputRecords()71   public long getInputRecords() {
72     return inputRecords;
73   }
getOutputBytes()74   public long[] getOutputBytes() {
75     if (0 == reduces) {
76       return new long[] { outputBytes };
77     }
78     final long[] ret = new long[reduces];
79     for (int i = 0; i < reduces; ++i) {
80       ret[i] = Math.round(outputBytes * reduceBytes[i]);
81     }
82     return ret;
83   }
getOutputRecords()84   public long[] getOutputRecords() {
85     if (0 == reduces) {
86       return new long[] { outputRecords };
87     }
88     final long[] ret = new long[reduces];
89     for (int i = 0; i < reduces; ++i) {
90       ret[i] = Math.round(outputRecords * reduceRecords[i]);
91     }
92     return ret;
93   }
getReduceBytes(int i)94   public long getReduceBytes(int i) {
95     return reduceOutputBytes[i];
96   }
getReduceRecords(int i)97   public long getReduceRecords(int i) {
98     return reduceOutputRecords[i];
99   }
100   @Override
write(DataOutput out)101   public void write(DataOutput out) throws IOException {
102     super.write(out);
103     WritableUtils.writeVInt(out, id);
104     WritableUtils.writeVInt(out, maps);
105     WritableUtils.writeVLong(out, inputRecords);
106     WritableUtils.writeVLong(out, outputBytes);
107     WritableUtils.writeVLong(out, outputRecords);
108     WritableUtils.writeVLong(out, maxMemory);
109     WritableUtils.writeVInt(out, reduces);
110     for (int i = 0; i < reduces; ++i) {
111       out.writeDouble(reduceBytes[i]);
112       out.writeDouble(reduceRecords[i]);
113     }
114     WritableUtils.writeVInt(out, nSpec);
115     for (int i = 0; i < nSpec; ++i) {
116       WritableUtils.writeVLong(out, reduceOutputBytes[i]);
117       WritableUtils.writeVLong(out, reduceOutputRecords[i]);
118     }
119   }
120   @Override
readFields(DataInput in)121   public void readFields(DataInput in) throws IOException {
122     super.readFields(in);
123     id = WritableUtils.readVInt(in);
124     maps = WritableUtils.readVInt(in);
125     inputRecords = WritableUtils.readVLong(in);
126     outputBytes = WritableUtils.readVLong(in);
127     outputRecords = WritableUtils.readVLong(in);
128     maxMemory = WritableUtils.readVLong(in);
129     reduces = WritableUtils.readVInt(in);
130     if (reduceBytes.length < reduces) {
131       reduceBytes = new double[reduces];
132       reduceRecords = new double[reduces];
133     }
134     for (int i = 0; i < reduces; ++i) {
135       reduceBytes[i] = in.readDouble();
136       reduceRecords[i] = in.readDouble();
137     }
138     nSpec = WritableUtils.readVInt(in);
139     if (reduceOutputBytes.length < nSpec) {
140       reduceOutputBytes = new long[nSpec];
141       reduceOutputRecords = new long[nSpec];
142     }
143     for (int i = 0; i < nSpec; ++i) {
144       reduceOutputBytes[i] = WritableUtils.readVLong(in);
145       reduceOutputRecords[i] = WritableUtils.readVLong(in);
146     }
147   }
148 }
149