1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.mapreduce; 20 21 import java.io.IOException; 22 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 import org.apache.hadoop.hbase.classification.InterfaceAudience; 26 import org.apache.hadoop.hbase.classification.InterfaceStability; 27 import org.apache.hadoop.conf.Configuration; 28 import org.apache.hadoop.fs.Path; 29 import org.apache.hadoop.hbase.HBaseConfiguration; 30 import org.apache.hadoop.hbase.client.Result; 31 import org.apache.hadoop.hbase.client.Scan; 32 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 33 import org.apache.hadoop.hbase.filter.Filter; 34 import org.apache.hadoop.hbase.filter.IncompatibleFilterException; 35 import org.apache.hadoop.hbase.filter.PrefixFilter; 36 import org.apache.hadoop.hbase.filter.RegexStringComparator; 37 import org.apache.hadoop.hbase.filter.RowFilter; 38 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 39 import org.apache.hadoop.hbase.util.Bytes; 40 import org.apache.hadoop.mapreduce.Job; 41 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 42 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 43 import org.apache.hadoop.util.GenericOptionsParser; 44 45 /** 46 * Export an HBase table. 47 * Writes content to sequence files up in HDFS. Use {@link Import} to read it 48 * back in again. 49 */ 50 @InterfaceAudience.Public 51 @InterfaceStability.Stable 52 public class Export { 53 private static final Log LOG = LogFactory.getLog(Export.class); 54 final static String NAME = "export"; 55 final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows"; 56 final static String EXPORT_BATCHING = "hbase.export.scanner.batch"; 57 58 /** 59 * Sets up the actual job. 60 * 61 * @param conf The current configuration. 62 * @param args The command line parameters. 63 * @return The newly created job. 64 * @throws IOException When setting up the job fails. 65 */ createSubmittableJob(Configuration conf, String[] args)66 public static Job createSubmittableJob(Configuration conf, String[] args) 67 throws IOException { 68 String tableName = args[0]; 69 Path outputDir = new Path(args[1]); 70 Job job = new Job(conf, NAME + "_" + tableName); 71 job.setJobName(NAME + "_" + tableName); 72 job.setJarByClass(Export.class); 73 // Set optional scan parameters 74 Scan s = getConfiguredScanForJob(conf, args); 75 IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job); 76 // No reducers. Just write straight to output files. 77 job.setNumReduceTasks(0); 78 job.setOutputFormatClass(SequenceFileOutputFormat.class); 79 job.setOutputKeyClass(ImmutableBytesWritable.class); 80 job.setOutputValueClass(Result.class); 81 FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs. 82 return job; 83 } 84 getConfiguredScanForJob(Configuration conf, String[] args)85 private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException { 86 Scan s = new Scan(); 87 // Optional arguments. 88 // Set Scan Versions 89 int versions = args.length > 2? Integer.parseInt(args[2]): 1; 90 s.setMaxVersions(versions); 91 // Set Scan Range 92 long startTime = args.length > 3? Long.parseLong(args[3]): 0L; 93 long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE; 94 s.setTimeRange(startTime, endTime); 95 // Set cache blocks 96 s.setCacheBlocks(false); 97 // set Start and Stop row 98 if (conf.get(TableInputFormat.SCAN_ROW_START) != null) { 99 s.setStartRow(Bytes.toBytes(conf.get(TableInputFormat.SCAN_ROW_START))); 100 } 101 if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) { 102 s.setStopRow(Bytes.toBytes(conf.get(TableInputFormat.SCAN_ROW_STOP))); 103 } 104 // Set Scan Column Family 105 boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN)); 106 if (raw) { 107 s.setRaw(raw); 108 } 109 110 if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) { 111 s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY))); 112 } 113 // Set RowFilter or Prefix Filter if applicable. 114 Filter exportFilter = getExportFilter(args); 115 if (exportFilter!= null) { 116 LOG.info("Setting Scan Filter for Export."); 117 s.setFilter(exportFilter); 118 } 119 120 int batching = conf.getInt(EXPORT_BATCHING, -1); 121 if (batching != -1){ 122 try { 123 s.setBatch(batching); 124 } catch (IncompatibleFilterException e) { 125 LOG.error("Batching could not be set", e); 126 } 127 } 128 LOG.info("versions=" + versions + ", starttime=" + startTime + 129 ", endtime=" + endTime + ", keepDeletedCells=" + raw); 130 return s; 131 } 132 getExportFilter(String[] args)133 private static Filter getExportFilter(String[] args) { 134 Filter exportFilter = null; 135 String filterCriteria = (args.length > 5) ? args[5]: null; 136 if (filterCriteria == null) return null; 137 if (filterCriteria.startsWith("^")) { 138 String regexPattern = filterCriteria.substring(1, filterCriteria.length()); 139 exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern)); 140 } else { 141 exportFilter = new PrefixFilter(Bytes.toBytes(filterCriteria)); 142 } 143 return exportFilter; 144 } 145 146 /* 147 * @param errorMsg Error message. Can be null. 148 */ usage(final String errorMsg)149 private static void usage(final String errorMsg) { 150 if (errorMsg != null && errorMsg.length() > 0) { 151 System.err.println("ERROR: " + errorMsg); 152 } 153 System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " + 154 "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n"); 155 System.err.println(" Note: -D properties will be applied to the conf used. "); 156 System.err.println(" For example: "); 157 System.err.println(" -D mapreduce.output.fileoutputformat.compress=true"); 158 System.err.println(" -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec"); 159 System.err.println(" -D mapreduce.output.fileoutputformat.compress.type=BLOCK"); 160 System.err.println(" Additionally, the following SCAN properties can be specified"); 161 System.err.println(" to control/limit what is exported.."); 162 System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>"); 163 System.err.println(" -D " + RAW_SCAN + "=true"); 164 System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>"); 165 System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>"); 166 System.err.println("For performance consider the following properties:\n" 167 + " -Dhbase.client.scanner.caching=100\n" 168 + " -Dmapreduce.map.speculative=false\n" 169 + " -Dmapreduce.reduce.speculative=false"); 170 System.err.println("For tables with very wide rows consider setting the batch size as below:\n" 171 + " -D" + EXPORT_BATCHING + "=10"); 172 } 173 174 /** 175 * Main entry point. 176 * 177 * @param args The command line parameters. 178 * @throws Exception When running the job fails. 179 */ main(String[] args)180 public static void main(String[] args) throws Exception { 181 Configuration conf = HBaseConfiguration.create(); 182 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 183 if (otherArgs.length < 2) { 184 usage("Wrong number of arguments: " + otherArgs.length); 185 System.exit(-1); 186 } 187 Job job = createSubmittableJob(conf, otherArgs); 188 System.exit(job.waitForCompletion(true)? 0 : 1); 189 } 190 } 191