1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapred.gridmix; 19 20 import java.io.DataInput; 21 import java.io.DataOutput; 22 import java.io.IOException; 23 24 import org.apache.hadoop.io.WritableUtils; 25 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; 26 27 class GridmixSplit extends CombineFileSplit { 28 private int id; 29 private int nSpec; 30 private int maps; 31 private int reduces; 32 private long inputRecords; 33 private long outputBytes; 34 private long outputRecords; 35 private long maxMemory; 36 private double[] reduceBytes = new double[0]; 37 private double[] reduceRecords = new double[0]; 38 39 // Spec for reduces id mod this 40 private long[] reduceOutputBytes = new long[0]; 41 private long[] reduceOutputRecords = new long[0]; 42 GridmixSplit()43 GridmixSplit() { 44 super(); 45 } 46 GridmixSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes, long inputRecords, long outputBytes, long outputRecords, double[] reduceBytes, double[] reduceRecords, long[] reduceOutputBytes, long[] reduceOutputRecords)47 public GridmixSplit(CombineFileSplit cfsplit, int maps, int id, 48 long inputBytes, long inputRecords, long outputBytes, 49 long outputRecords, double[] reduceBytes, double[] reduceRecords, 50 long[] reduceOutputBytes, long[] reduceOutputRecords) 51 throws IOException { 52 super(cfsplit); 53 this.id = id; 54 this.maps = maps; 55 reduces = reduceBytes.length; 56 this.inputRecords = inputRecords; 57 this.outputBytes = outputBytes; 58 this.outputRecords = outputRecords; 59 this.reduceBytes = reduceBytes; 60 this.reduceRecords = reduceRecords; 61 nSpec = reduceOutputBytes.length; 62 this.reduceOutputBytes = reduceOutputBytes; 63 this.reduceOutputRecords = reduceOutputRecords; 64 } getId()65 public int getId() { 66 return id; 67 } getMapCount()68 public int getMapCount() { 69 return maps; 70 } getInputRecords()71 public long getInputRecords() { 72 return inputRecords; 73 } getOutputBytes()74 public long[] getOutputBytes() { 75 if (0 == reduces) { 76 return new long[] { outputBytes }; 77 } 78 final long[] ret = new long[reduces]; 79 for (int i = 0; i < reduces; ++i) { 80 ret[i] = Math.round(outputBytes * reduceBytes[i]); 81 } 82 return ret; 83 } getOutputRecords()84 public long[] getOutputRecords() { 85 if (0 == reduces) { 86 return new long[] { outputRecords }; 87 } 88 final long[] ret = new long[reduces]; 89 for (int i = 0; i < reduces; ++i) { 90 ret[i] = Math.round(outputRecords * reduceRecords[i]); 91 } 92 return ret; 93 } getReduceBytes(int i)94 public long getReduceBytes(int i) { 95 return reduceOutputBytes[i]; 96 } getReduceRecords(int i)97 public long getReduceRecords(int i) { 98 return reduceOutputRecords[i]; 99 } 100 @Override write(DataOutput out)101 public void write(DataOutput out) throws IOException { 102 super.write(out); 103 WritableUtils.writeVInt(out, id); 104 WritableUtils.writeVInt(out, maps); 105 WritableUtils.writeVLong(out, inputRecords); 106 WritableUtils.writeVLong(out, outputBytes); 107 WritableUtils.writeVLong(out, outputRecords); 108 WritableUtils.writeVLong(out, maxMemory); 109 WritableUtils.writeVInt(out, reduces); 110 for (int i = 0; i < reduces; ++i) { 111 out.writeDouble(reduceBytes[i]); 112 out.writeDouble(reduceRecords[i]); 113 } 114 WritableUtils.writeVInt(out, nSpec); 115 for (int i = 0; i < nSpec; ++i) { 116 WritableUtils.writeVLong(out, reduceOutputBytes[i]); 117 WritableUtils.writeVLong(out, reduceOutputRecords[i]); 118 } 119 } 120 @Override readFields(DataInput in)121 public void readFields(DataInput in) throws IOException { 122 super.readFields(in); 123 id = WritableUtils.readVInt(in); 124 maps = WritableUtils.readVInt(in); 125 inputRecords = WritableUtils.readVLong(in); 126 outputBytes = WritableUtils.readVLong(in); 127 outputRecords = WritableUtils.readVLong(in); 128 maxMemory = WritableUtils.readVLong(in); 129 reduces = WritableUtils.readVInt(in); 130 if (reduceBytes.length < reduces) { 131 reduceBytes = new double[reduces]; 132 reduceRecords = new double[reduces]; 133 } 134 for (int i = 0; i < reduces; ++i) { 135 reduceBytes[i] = in.readDouble(); 136 reduceRecords[i] = in.readDouble(); 137 } 138 nSpec = WritableUtils.readVInt(in); 139 if (reduceOutputBytes.length < nSpec) { 140 reduceOutputBytes = new long[nSpec]; 141 reduceOutputRecords = new long[nSpec]; 142 } 143 for (int i = 0; i < nSpec; ++i) { 144 reduceOutputBytes[i] = WritableUtils.readVLong(in); 145 reduceOutputRecords[i] = WritableUtils.readVLong(in); 146 } 147 } 148 } 149