1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.mapreduce;
20 
21 import static java.lang.String.format;
22 
23 import com.google.common.collect.HashMultimap;
24 import com.google.common.collect.Multimap;
25 import com.google.common.collect.Multimaps;
26 import com.google.common.util.concurrent.ThreadFactoryBuilder;
27 
28 import org.apache.commons.lang.mutable.MutableInt;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.conf.Configured;
33 import org.apache.hadoop.fs.FileStatus;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.fs.permission.FsPermission;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HColumnDescriptor;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HTableDescriptor;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.KeyValueUtil;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.TableNotFoundException;
45 import org.apache.hadoop.hbase.classification.InterfaceAudience;
46 import org.apache.hadoop.hbase.classification.InterfaceStability;
47 import org.apache.hadoop.hbase.client.Admin;
48 import org.apache.hadoop.hbase.client.ClusterConnection;
49 import org.apache.hadoop.hbase.client.Connection;
50 import org.apache.hadoop.hbase.client.ConnectionFactory;
51 import org.apache.hadoop.hbase.client.HBaseAdmin;
52 import org.apache.hadoop.hbase.client.HConnection;
53 import org.apache.hadoop.hbase.client.HTable;
54 import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
55 import org.apache.hadoop.hbase.client.RegionLocator;
56 import org.apache.hadoop.hbase.client.RegionServerCallable;
57 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
58 import org.apache.hadoop.hbase.client.Table;
59 import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
60 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
61 import org.apache.hadoop.hbase.io.HFileLink;
62 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
63 import org.apache.hadoop.hbase.io.Reference;
64 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
65 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
66 import org.apache.hadoop.hbase.io.hfile.HFile;
67 import org.apache.hadoop.hbase.io.hfile.HFileContext;
68 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
69 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
70 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
71 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
72 import org.apache.hadoop.hbase.regionserver.BloomType;
73 import org.apache.hadoop.hbase.regionserver.HStore;
74 import org.apache.hadoop.hbase.regionserver.StoreFile;
75 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
76 import org.apache.hadoop.hbase.security.UserProvider;
77 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
78 import org.apache.hadoop.hbase.util.Bytes;
79 import org.apache.hadoop.hbase.util.FSHDFSUtils;
80 import org.apache.hadoop.hbase.util.Pair;
81 import org.apache.hadoop.util.Tool;
82 import org.apache.hadoop.util.ToolRunner;
83 
84 import java.io.FileNotFoundException;
85 import java.io.IOException;
86 import java.io.InterruptedIOException;
87 import java.nio.ByteBuffer;
88 import java.util.ArrayList;
89 import java.util.Arrays;
90 import java.util.Collection;
91 import java.util.Deque;
92 import java.util.HashMap;
93 import java.util.HashSet;
94 import java.util.Iterator;
95 import java.util.LinkedList;
96 import java.util.List;
97 import java.util.Map;
98 import java.util.Map.Entry;
99 import java.util.Set;
100 import java.util.TreeMap;
101 import java.util.UUID;
102 import java.util.concurrent.Callable;
103 import java.util.concurrent.ExecutionException;
104 import java.util.concurrent.ExecutorService;
105 import java.util.concurrent.Future;
106 import java.util.concurrent.LinkedBlockingQueue;
107 import java.util.concurrent.ThreadPoolExecutor;
108 import java.util.concurrent.TimeUnit;
109 
110 /**
111  * Tool to load the output of HFileOutputFormat into an existing table.
112  * @see #usage()
113  */
114 @InterfaceAudience.Public
115 @InterfaceStability.Stable
116 public class LoadIncrementalHFiles extends Configured implements Tool {
117   private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
118   private Admin hbAdmin;
119 
120   public static final String NAME = "completebulkload";
121   public static final String MAX_FILES_PER_REGION_PER_FAMILY
122     = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
123   private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
124   public final static String CREATE_TABLE_CONF_KEY = "create.table";
125 
126   private int maxFilesPerRegionPerFamily;
127   private boolean assignSeqIds;
128 
129   // Source filesystem
130   private FileSystem fs;
131   // Source delegation token
132   private FsDelegationToken fsDelegationToken;
133   private String bulkToken;
134   private UserProvider userProvider;
135 
LoadIncrementalHFiles()136   private LoadIncrementalHFiles() {}
137 
LoadIncrementalHFiles(Configuration conf)138   public LoadIncrementalHFiles(Configuration conf) throws Exception {
139     super(conf);
140     initialize();
141   }
142 
initialize()143   private void initialize() throws Exception {
144     if (hbAdmin == null) {
145       // make a copy, just to be sure we're not overriding someone else's config
146       setConf(HBaseConfiguration.create(getConf()));
147       Configuration conf = getConf();
148       // disable blockcache for tool invocation, see HBASE-10500
149       conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
150       this.hbAdmin = new HBaseAdmin(conf);
151       this.userProvider = UserProvider.instantiate(conf);
152       this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
153       assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
154       maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
155     }
156   }
157 
usage()158   private void usage() {
159     System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
160         + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
161         + "  Note: if you set this to 'no', then the target table must already exist in HBase\n"
162         + "\n");
163   }
164 
165   private static interface BulkHFileVisitor<TFamily> {
bulkFamily(final byte[] familyName)166     TFamily bulkFamily(final byte[] familyName)
167       throws IOException;
bulkHFile(final TFamily family, final FileStatus hfileStatus)168     void bulkHFile(final TFamily family, final FileStatus hfileStatus)
169       throws IOException;
170   }
171 
172   /**
173    * Iterate over the bulkDir hfiles.
174    * Skip reference, HFileLink, files starting with "_" and non-valid hfiles.
175    */
visitBulkHFiles(final FileSystem fs, final Path bulkDir, final BulkHFileVisitor<TFamily> visitor)176   private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
177     final BulkHFileVisitor<TFamily> visitor) throws IOException {
178     visitBulkHFiles(fs, bulkDir, visitor, true);
179   }
180 
181   /**
182    * Iterate over the bulkDir hfiles.
183    * Skip reference, HFileLink, files starting with "_".
184    * Check and skip non-valid hfiles by default, or skip this validation by setting
185    * 'hbase.loadincremental.validate.hfile' to false.
186    */
visitBulkHFiles(final FileSystem fs, final Path bulkDir, final BulkHFileVisitor<TFamily> visitor, final boolean validateHFile)187   private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
188     final BulkHFileVisitor<TFamily> visitor, final boolean validateHFile) throws IOException {
189     if (!fs.exists(bulkDir)) {
190       throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
191     }
192 
193     FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
194     if (familyDirStatuses == null) {
195       throw new FileNotFoundException("No families found in " + bulkDir);
196     }
197 
198     for (FileStatus familyStat : familyDirStatuses) {
199       if (!familyStat.isDirectory()) {
200         LOG.warn("Skipping non-directory " + familyStat.getPath());
201         continue;
202       }
203       Path familyDir = familyStat.getPath();
204       byte[] familyName = familyDir.getName().getBytes();
205       TFamily family = visitor.bulkFamily(familyName);
206 
207       FileStatus[] hfileStatuses = fs.listStatus(familyDir);
208       for (FileStatus hfileStatus : hfileStatuses) {
209         if (!fs.isFile(hfileStatus.getPath())) {
210           LOG.warn("Skipping non-file " + hfileStatus);
211           continue;
212         }
213 
214         Path hfile = hfileStatus.getPath();
215         // Skip "_", reference, HFileLink
216         String fileName = hfile.getName();
217         if (fileName.startsWith("_")) {
218           continue;
219         }
220         if (StoreFileInfo.isReference(fileName)) {
221           LOG.warn("Skipping reference " + fileName);
222           continue;
223         }
224         if (HFileLink.isHFileLink(fileName)) {
225           LOG.warn("Skipping HFileLink " + fileName);
226           continue;
227         }
228 
229         // Validate HFile Format if needed
230         if (validateHFile) {
231           try {
232             if (!HFile.isHFileFormat(fs, hfile)) {
233               LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
234               continue;
235             }
236           } catch (FileNotFoundException e) {
237             LOG.warn("the file " + hfile + " was removed");
238             continue;
239           }
240 	}
241 
242         visitor.bulkHFile(family, hfileStatus);
243       }
244     }
245   }
246 
247   /**
248    * Represents an HFile waiting to be loaded. An queue is used
249    * in this class in order to support the case where a region has
250    * split during the process of the load. When this happens,
251    * the HFile is split into two physical parts across the new
252    * region boundary, and each part is added back into the queue.
253    * The import process finishes when the queue is empty.
254    */
255   static class LoadQueueItem {
256     final byte[] family;
257     final Path hfilePath;
258 
LoadQueueItem(byte[] family, Path hfilePath)259     public LoadQueueItem(byte[] family, Path hfilePath) {
260       this.family = family;
261       this.hfilePath = hfilePath;
262     }
263 
264     @Override
toString()265     public String toString() {
266       return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
267     }
268   }
269 
270   /**
271    * Walk the given directory for all HFiles, and return a Queue
272    * containing all such files.
273    */
discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, final boolean validateHFile)274   private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
275     final boolean validateHFile) throws IOException {
276     fs = hfofDir.getFileSystem(getConf());
277     visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
278       @Override
279       public byte[] bulkFamily(final byte[] familyName) {
280         return familyName;
281       }
282       @Override
283       public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
284         long length = hfile.getLen();
285         if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
286             HConstants.DEFAULT_MAX_FILE_SIZE)) {
287           LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
288               length + " bytes can be problematic as it may lead to oversplitting.");
289         }
290         ret.add(new LoadQueueItem(family, hfile.getPath()));
291       }
292     }, validateHFile);
293   }
294 
295   /**
296    * Perform a bulk load of the given directory into the given
297    * pre-existing table.  This method is not threadsafe.
298    *
299    * @param hfofDir the directory that was provided as the output path
300    * of a job using HFileOutputFormat
301    * @param table the table to load into
302    * @throws TableNotFoundException if table does not yet exist
303    */
304   @SuppressWarnings("deprecation")
doBulkLoad(Path hfofDir, final HTable table)305   public void doBulkLoad(Path hfofDir, final HTable table)
306     throws TableNotFoundException, IOException
307   {
308     Admin admin = null;
309     Table t = table;
310     Connection conn = table.getConnection();
311     boolean closeConnWhenFinished = false;
312     try {
313       if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
314         LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection.");
315         // can only use unmanaged connections from here on out.
316         conn = ConnectionFactory.createConnection(table.getConfiguration());
317         t = conn.getTable(table.getName());
318         closeConnWhenFinished = true;
319         if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
320           throw new RuntimeException("Failed to create unmanaged connection.");
321         }
322         admin = conn.getAdmin();
323       } else {
324         admin = conn.getAdmin();
325       }
326       try (RegionLocator rl = conn.getRegionLocator(t.getName())) {
327         doBulkLoad(hfofDir, admin, t, rl);
328       }
329     } finally {
330       if (admin != null) admin.close();
331       if (closeConnWhenFinished) {
332         t.close();
333         conn.close();
334       }
335     }
336   }
337 
338   /**
339    * Perform a bulk load of the given directory into the given
340    * pre-existing table.  This method is not threadsafe.
341    *
342    * @param hfofDir the directory that was provided as the output path
343    * of a job using HFileOutputFormat
344    * @param table the table to load into
345    * @throws TableNotFoundException if table does not yet exist
346    */
347   @SuppressWarnings("deprecation")
doBulkLoad(Path hfofDir, final Admin admin, Table table, RegionLocator regionLocator)348   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
349       RegionLocator regionLocator) throws TableNotFoundException, IOException  {
350 
351     if (!admin.isTableAvailable(regionLocator.getName())) {
352       throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
353     }
354 
355     // initialize thread pools
356     int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
357       Runtime.getRuntime().availableProcessors());
358     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
359     builder.setNameFormat("LoadIncrementalHFiles-%1$d");
360     ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
361         60, TimeUnit.SECONDS,
362         new LinkedBlockingQueue<Runnable>(),
363         builder.build());
364     ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
365 
366     // LQI queue does not need to be threadsafe -- all operations on this queue
367     // happen in this thread
368     Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
369     try {
370       /*
371        * Checking hfile format is a time-consuming operation, we should have an option to skip
372        * this step when bulkloading millions of HFiles. See HBASE-13985.
373        */
374       boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
375       if(!validateHFile) {
376 	LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
377 	    "are not correct. If you fail to read data from your table after using this " +
378 	    "option, consider removing the files and bulkload again without this option. " +
379 	    "See HBASE-13985");
380       }
381       discoverLoadQueue(queue, hfofDir, validateHFile);
382       // check whether there is invalid family name in HFiles to be bulkloaded
383       Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
384       ArrayList<String> familyNames = new ArrayList<String>(families.size());
385       for (HColumnDescriptor family : families) {
386         familyNames.add(family.getNameAsString());
387       }
388       ArrayList<String> unmatchedFamilies = new ArrayList<String>();
389       Iterator<LoadQueueItem> queueIter = queue.iterator();
390       while (queueIter.hasNext()) {
391         LoadQueueItem lqi = queueIter.next();
392         String familyNameInHFile = Bytes.toString(lqi.family);
393         if (!familyNames.contains(familyNameInHFile)) {
394           unmatchedFamilies.add(familyNameInHFile);
395         }
396       }
397       if (unmatchedFamilies.size() > 0) {
398         String msg =
399             "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
400                 + unmatchedFamilies + "; valid family names of table "
401                 + table.getName() + " are: " + familyNames;
402         LOG.error(msg);
403         throw new IOException(msg);
404       }
405       int count = 0;
406 
407       if (queue.isEmpty()) {
408         LOG.warn("Bulk load operation did not find any files to load in " +
409             "directory " + hfofDir.toUri() + ".  Does it contain files in " +
410             "subdirectories that correspond to column family names?");
411         return;
412       }
413 
414       //If using secure bulk load, get source delegation token, and
415       //prepare staging directory and token
416       // fs is the source filesystem
417       fsDelegationToken.acquireDelegationToken(fs);
418       if(isSecureBulkLoadEndpointAvailable()) {
419         bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
420       }
421 
422       // Assumes that region splits can happen while this occurs.
423       while (!queue.isEmpty()) {
424         // need to reload split keys each iteration.
425         final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
426         if (count != 0) {
427           LOG.info("Split occured while grouping HFiles, retry attempt " +
428               + count + " with " + queue.size() + " files remaining to group or split");
429         }
430 
431         int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10);
432         maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
433         if (maxRetries != 0 && count >= maxRetries) {
434           throw new IOException("Retry attempted " + count +
435             " times without completing, bailing out");
436         }
437         count++;
438 
439         // Using ByteBuffer for byte[] equality semantics
440         Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
441             pool, queue, startEndKeys);
442 
443         if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
444           // Error is logged inside checkHFilesCountPerRegionPerFamily.
445           throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
446             + " hfiles to one family of one region");
447         }
448 
449         bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
450 
451         // NOTE: The next iteration's split / group could happen in parallel to
452         // atomic bulkloads assuming that there are splits and no merges, and
453         // that we can atomically pull out the groups we want to retry.
454       }
455 
456     } finally {
457       fsDelegationToken.releaseDelegationToken();
458       if(bulkToken != null) {
459         new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
460       }
461       pool.shutdown();
462       if (queue != null && !queue.isEmpty()) {
463         StringBuilder err = new StringBuilder();
464         err.append("-------------------------------------------------\n");
465         err.append("Bulk load aborted with some files not yet loaded:\n");
466         err.append("-------------------------------------------------\n");
467         for (LoadQueueItem q : queue) {
468           err.append("  ").append(q.hfilePath).append('\n');
469         }
470         LOG.error(err);
471       }
472     }
473 
474     if (queue != null && !queue.isEmpty()) {
475         throw new RuntimeException("Bulk load aborted with some files not yet loaded."
476           + "Please check log for more details.");
477     }
478   }
479 
480   /**
481    * This takes the LQI's grouped by likely regions and attempts to bulk load
482    * them.  Any failures are re-queued for another pass with the
483    * groupOrSplitPhase.
484    */
bulkLoadPhase(final Table table, final Connection conn, ExecutorService pool, Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups)485   protected void bulkLoadPhase(final Table table, final Connection conn,
486       ExecutorService pool, Deque<LoadQueueItem> queue,
487       final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
488     // atomically bulk load the groups.
489     Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
490     for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
491       final byte[] first = e.getKey().array();
492       final Collection<LoadQueueItem> lqis =  e.getValue();
493 
494       final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
495         @Override
496         public List<LoadQueueItem> call() throws Exception {
497           List<LoadQueueItem> toRetry =
498               tryAtomicRegionLoad(conn, table.getName(), first, lqis);
499           return toRetry;
500         }
501       };
502       loadingFutures.add(pool.submit(call));
503     }
504 
505     // get all the results.
506     for (Future<List<LoadQueueItem>> future : loadingFutures) {
507       try {
508         List<LoadQueueItem> toRetry = future.get();
509 
510         // LQIs that are requeued to be regrouped.
511         queue.addAll(toRetry);
512 
513       } catch (ExecutionException e1) {
514         Throwable t = e1.getCause();
515         if (t instanceof IOException) {
516           // At this point something unrecoverable has happened.
517           // TODO Implement bulk load recovery
518           throw new IOException("BulkLoad encountered an unrecoverable problem", t);
519         }
520         LOG.error("Unexpected execution exception during bulk load", e1);
521         throw new IllegalStateException(t);
522       } catch (InterruptedException e1) {
523         LOG.error("Unexpected interrupted exception during bulk load", e1);
524         throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
525       }
526     }
527   }
528 
checkHFilesCountPerRegionPerFamily( final Multimap<ByteBuffer, LoadQueueItem> regionGroups)529   private boolean checkHFilesCountPerRegionPerFamily(
530       final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
531     for (Entry<ByteBuffer,
532         ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
533       final Collection<LoadQueueItem> lqis =  e.getValue();
534       HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>();
535       for (LoadQueueItem lqi: lqis) {
536         MutableInt count = filesMap.get(lqi.family);
537         if (count == null) {
538           count = new MutableInt();
539           filesMap.put(lqi.family, count);
540         }
541         count.increment();
542         if (count.intValue() > maxFilesPerRegionPerFamily) {
543           LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
544             + " hfiles to family " + Bytes.toStringBinary(lqi.family)
545             + " of region with start key "
546             + Bytes.toStringBinary(e.getKey()));
547           return false;
548         }
549       }
550     }
551     return true;
552   }
553 
554   /**
555    * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
556    * bulk load region targets.
557    */
groupOrSplitPhase(final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, final Pair<byte[][], byte[][]> startEndKeys)558   private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table,
559       ExecutorService pool, Deque<LoadQueueItem> queue,
560       final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
561     // <region start key, LQI> need synchronized only within this scope of this
562     // phase because of the puts that happen in futures.
563     Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
564     final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
565 
566     // drain LQIs and figure out bulk load groups
567     Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
568     while (!queue.isEmpty()) {
569       final LoadQueueItem item = queue.remove();
570 
571       final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
572         @Override
573         public List<LoadQueueItem> call() throws Exception {
574           List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
575           return splits;
576         }
577       };
578       splittingFutures.add(pool.submit(call));
579     }
580     // get all the results.  All grouping and splitting must finish before
581     // we can attempt the atomic loads.
582     for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
583       try {
584         List<LoadQueueItem> splits = lqis.get();
585         if (splits != null) {
586           queue.addAll(splits);
587         }
588       } catch (ExecutionException e1) {
589         Throwable t = e1.getCause();
590         if (t instanceof IOException) {
591           LOG.error("IOException during splitting", e1);
592           throw (IOException)t; // would have been thrown if not parallelized,
593         }
594         LOG.error("Unexpected execution exception during splitting", e1);
595         throw new IllegalStateException(t);
596       } catch (InterruptedException e1) {
597         LOG.error("Unexpected interrupted exception during splitting", e1);
598         throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
599       }
600     }
601     return regionGroups;
602   }
603 
604   // unique file name for the table
getUniqueName()605   private String getUniqueName() {
606     return UUID.randomUUID().toString().replaceAll("-", "");
607   }
608 
splitStoreFile(final LoadQueueItem item, final Table table, byte[] startKey, byte[] splitKey)609   protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
610       final Table table, byte[] startKey,
611       byte[] splitKey) throws IOException {
612     final Path hfilePath = item.hfilePath;
613 
614     // We use a '_' prefix which is ignored when walking directory trees
615     // above.
616     final String TMP_DIR = "_tmp";
617     Path tmpDir = item.hfilePath.getParent();
618     if (!tmpDir.getName().equals(TMP_DIR)) {
619       tmpDir = new Path(tmpDir, TMP_DIR);
620     }
621 
622     LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
623     "region. Splitting...");
624 
625     String uniqueName = getUniqueName();
626     HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
627     Path botOut = new Path(tmpDir, uniqueName + ".bottom");
628     Path topOut = new Path(tmpDir, uniqueName + ".top");
629     splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
630         botOut, topOut);
631 
632     FileSystem fs = tmpDir.getFileSystem(getConf());
633     fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
634     fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
635     fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
636 
637     // Add these back at the *front* of the queue, so there's a lower
638     // chance that the region will just split again before we get there.
639     List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
640     lqis.add(new LoadQueueItem(item.family, botOut));
641     lqis.add(new LoadQueueItem(item.family, topOut));
642 
643     LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
644     return lqis;
645   }
646 
647   /**
648    * Attempt to assign the given load queue item into its target region group.
649    * If the hfile boundary no longer fits into a region, physically splits
650    * the hfile such that the new bottom half will fit and returns the list of
651    * LQI's corresponding to the resultant hfiles.
652    *
653    * protected for testing
654    * @throws IOException
655    */
groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, final Pair<byte[][], byte[][]> startEndKeys)656   protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
657       final LoadQueueItem item, final Table table,
658       final Pair<byte[][], byte[][]> startEndKeys)
659       throws IOException {
660     final Path hfilePath = item.hfilePath;
661     HFile.Reader hfr = HFile.createReader(fs, hfilePath,
662         new CacheConfig(getConf()), getConf());
663     final byte[] first, last;
664     try {
665       hfr.loadFileInfo();
666       first = hfr.getFirstRowKey();
667       last = hfr.getLastRowKey();
668     }  finally {
669       hfr.close();
670     }
671 
672     LOG.info("Trying to load hfile=" + hfilePath +
673         " first=" + Bytes.toStringBinary(first) +
674         " last="  + Bytes.toStringBinary(last));
675     if (first == null || last == null) {
676       assert first == null && last == null;
677       // TODO what if this is due to a bad HFile?
678       LOG.info("hfile " + hfilePath + " has no entries, skipping");
679       return null;
680     }
681     if (Bytes.compareTo(first, last) > 0) {
682       throw new IllegalArgumentException(
683       "Invalid range: " + Bytes.toStringBinary(first) +
684       " > " + Bytes.toStringBinary(last));
685     }
686     int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
687         Bytes.BYTES_COMPARATOR);
688     if (idx < 0) {
689       // not on boundary, returns -(insertion index).  Calculate region it
690       // would be in.
691       idx = -(idx + 1) - 1;
692     }
693     final int indexForCallable = idx;
694 
695     /**
696      * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
697      * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
698      * region. 3) if the endkey of the last region is not empty.
699      */
700     if (indexForCallable < 0) {
701       throw new IOException("The first region info for table "
702           + table.getName()
703           + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
704     } else if ((indexForCallable == startEndKeys.getFirst().length - 1)
705         && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
706       throw new IOException("The last region info for table "
707           + table.getName()
708           + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
709     } else if (indexForCallable + 1 < startEndKeys.getFirst().length
710         && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
711           startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
712       throw new IOException("The endkey of one region for table "
713           + table.getName()
714           + " is not equal to the startkey of the next region in hbase:meta."
715           + "Please use hbck tool to fix it first.");
716     }
717 
718     boolean lastKeyInRange =
719       Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
720       Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
721     if (!lastKeyInRange) {
722       List<LoadQueueItem> lqis = splitStoreFile(item, table,
723           startEndKeys.getFirst()[indexForCallable],
724           startEndKeys.getSecond()[indexForCallable]);
725       return lqis;
726     }
727 
728     // group regions.
729     regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
730     return null;
731   }
732 
733   /**
734    * @deprecated As of release 0.96
735    *             (<a href="https://issues.apache.org/jira/browse/HBASE-9508">HBASE-9508</a>).
736    *             This will be removed in HBase 2.0.0.
737    *             Use {@link #tryAtomicRegionLoad(Connection, TableName, byte[], Collection)}.
738    */
739   @Deprecated
740   protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
741       final byte [] tableName, final byte[] first, Collection<LoadQueueItem> lqis)
742   throws IOException {
743     return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis);
744   }
745 
746   /**
747    * Attempts to do an atomic load of many hfiles into a region.  If it fails,
748    * it returns a list of hfiles that need to be retried.  If it is successful
749    * it will return an empty list.
750    *
751    * NOTE: To maintain row atomicity guarantees, region server callable should
752    * succeed atomically and fails atomically.
753    *
754    * Protected for testing.
755    *
756    * @return empty list if success, list of items to retry on recoverable
757    * failure
758    */
759   protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
760       final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
761   throws IOException {
762     final List<Pair<byte[], String>> famPaths =
763       new ArrayList<Pair<byte[], String>>(lqis.size());
764     for (LoadQueueItem lqi : lqis) {
765       famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
766     }
767 
768     final RegionServerCallable<Boolean> svrCallable =
769         new RegionServerCallable<Boolean>(conn, tableName, first) {
770       @Override
771       public Boolean call(int callTimeout) throws Exception {
772         SecureBulkLoadClient secureClient = null;
773         boolean success = false;
774 
775         try {
776           LOG.debug("Going to connect to server " + getLocation() + " for row "
777               + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
778           byte[] regionName = getLocation().getRegionInfo().getRegionName();
779           if (!isSecureBulkLoadEndpointAvailable()) {
780             success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
781           } else {
782             try (Table table = conn.getTable(getTableName())) {
783               secureClient = new SecureBulkLoadClient(table);
784               success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
785                 bulkToken, getLocation().getRegionInfo().getStartKey());
786             }
787           }
788           return success;
789         } finally {
790           //Best effort copying of files that might not have been imported
791           //from the staging directory back to original location
792           //in user directory
793           if(secureClient != null && !success) {
794             FileSystem targetFs = FileSystem.get(getConf());
795             // Check to see if the source and target filesystems are the same
796             // If they are the same filesystem, we will try move the files back
797             // because previously we moved them to the staging directory.
798             if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
799               for(Pair<byte[], String> el : famPaths) {
800                 Path hfileStagingPath = null;
801                 Path hfileOrigPath = new Path(el.getSecond());
802                 try {
803                   hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
804                     hfileOrigPath.getName());
805                   if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
806                     LOG.debug("Moved back file " + hfileOrigPath + " from " +
807                         hfileStagingPath);
808                   } else if(targetFs.exists(hfileStagingPath)){
809                     LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
810                         hfileStagingPath);
811                   }
812                 } catch(Exception ex) {
813                   LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
814                       hfileStagingPath, ex);
815                 }
816               }
817             }
818           }
819         }
820       }
821     };
822 
823     try {
824       List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
825       Configuration conf = getConf();
826       boolean success = RpcRetryingCallerFactory.instantiate(conf,
827           null).<Boolean> newCaller()
828           .callWithRetries(svrCallable, Integer.MAX_VALUE);
829       if (!success) {
830         LOG.warn("Attempt to bulk load region containing "
831             + Bytes.toStringBinary(first) + " into table "
832             + tableName  + " with files " + lqis
833             + " failed.  This is recoverable and they will be retried.");
834         toRetry.addAll(lqis); // return lqi's to retry
835       }
836       // success
837       return toRetry;
838     } catch (IOException e) {
839       LOG.error("Encountered unrecoverable error from region server, additional details: "
840           + svrCallable.getExceptionMessageAdditionalDetail(), e);
841       throw e;
842     }
843   }
844 
845   private boolean isSecureBulkLoadEndpointAvailable() {
846     String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
847     return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
848   }
849 
850   /**
851    * Split a storefile into a top and bottom half, maintaining
852    * the metadata, recreating bloom filters, etc.
853    */
854   static void splitStoreFile(
855       Configuration conf, Path inFile,
856       HColumnDescriptor familyDesc, byte[] splitKey,
857       Path bottomOut, Path topOut) throws IOException
858   {
859     // Open reader with no block cache, and not in-memory
860     Reference topReference = Reference.createTopReference(splitKey);
861     Reference bottomReference = Reference.createBottomReference(splitKey);
862 
863     copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
864     copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
865   }
866 
867   /**
868    * Copy half of an HFile into a new HFile.
869    */
870   private static void copyHFileHalf(
871       Configuration conf, Path inFile, Path outFile, Reference reference,
872       HColumnDescriptor familyDescriptor)
873   throws IOException {
874     FileSystem fs = inFile.getFileSystem(conf);
875     CacheConfig cacheConf = new CacheConfig(conf);
876     HalfStoreFileReader halfReader = null;
877     StoreFile.Writer halfWriter = null;
878     try {
879       halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
880       Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
881 
882       int blocksize = familyDescriptor.getBlocksize();
883       Algorithm compression = familyDescriptor.getCompression();
884       BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
885       HFileContext hFileContext = new HFileContextBuilder()
886                                   .withCompression(compression)
887                                   .withChecksumType(HStore.getChecksumType(conf))
888                                   .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
889                                   .withBlockSize(blocksize)
890                                   .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
891                                   .withIncludesTags(true)
892                                   .build();
893       halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
894           fs)
895               .withFilePath(outFile)
896               .withBloomType(bloomFilterType)
897               .withFileContext(hFileContext)
898               .build();
899       HFileScanner scanner = halfReader.getScanner(false, false, false);
900       scanner.seekTo();
901       do {
902         halfWriter.append(scanner.getKeyValue());
903       } while (scanner.next());
904 
905       for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
906         if (shouldCopyHFileMetaKey(entry.getKey())) {
907           halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
908         }
909       }
910     } finally {
911       if (halfWriter != null) halfWriter.close();
912       if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose());
913     }
914   }
915 
916   private static boolean shouldCopyHFileMetaKey(byte[] key) {
917     // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
918     if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
919       return false;
920     }
921 
922     return !HFile.isReservedFileInfoKey(key);
923   }
924 
925   private boolean doesTableExist(TableName tableName) throws Exception {
926     return hbAdmin.tableExists(tableName);
927   }
928 
929   /*
930    * Infers region boundaries for a new table.
931    * Parameter:
932    *   bdryMap is a map between keys to an integer belonging to {+1, -1}
933    *     If a key is a start key of a file, then it maps to +1
934    *     If a key is an end key of a file, then it maps to -1
935    * Algo:
936    * 1) Poll on the keys in order:
937    *    a) Keep adding the mapped values to these keys (runningSum)
938    *    b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list.
939    * 2) Return the boundary list.
940    */
941   public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
942     ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
943     int runningValue = 0;
944     byte[] currStartKey = null;
945     boolean firstBoundary = true;
946 
947     for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
948       if (runningValue == 0) currStartKey = item.getKey();
949       runningValue += item.getValue();
950       if (runningValue == 0) {
951         if (!firstBoundary) keysArray.add(currStartKey);
952         firstBoundary = false;
953       }
954     }
955 
956     return keysArray.toArray(new byte[0][0]);
957   }
958 
959   /*
960    * If the table is created for the first time, then "completebulkload" reads the files twice.
961    * More modifications necessary if we want to avoid doing it.
962    */
963   private void createTable(TableName tableName, String dirPath) throws Exception {
964     final Path hfofDir = new Path(dirPath);
965     final FileSystem fs = hfofDir.getFileSystem(getConf());
966 
967     // Add column families
968     // Build a set of keys
969     final HTableDescriptor htd = new HTableDescriptor(tableName);
970     final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
971     visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
972       @Override
973       public HColumnDescriptor bulkFamily(final byte[] familyName) {
974         HColumnDescriptor hcd = new HColumnDescriptor(familyName);
975         htd.addFamily(hcd);
976         return hcd;
977       }
978       @Override
979       public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
980           throws IOException {
981         Path hfile = hfileStatus.getPath();
982         HFile.Reader reader = HFile.createReader(fs, hfile,
983             new CacheConfig(getConf()), getConf());
984         try {
985           if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
986             hcd.setCompressionType(reader.getFileContext().getCompression());
987             LOG.info("Setting compression " + hcd.getCompressionType().name() +
988                      " for family " + hcd.toString());
989           }
990           reader.loadFileInfo();
991           byte[] first = reader.getFirstRowKey();
992           byte[] last  = reader.getLastRowKey();
993 
994           LOG.info("Trying to figure out region boundaries hfile=" + hfile +
995             " first=" + Bytes.toStringBinary(first) +
996             " last="  + Bytes.toStringBinary(last));
997 
998           // To eventually infer start key-end key boundaries
999           Integer value = map.containsKey(first)? map.get(first):0;
1000           map.put(first, value+1);
1001 
1002           value = map.containsKey(last)? map.get(last):0;
1003           map.put(last, value-1);
1004         } finally {
1005           reader.close();
1006         }
1007       }
1008     });
1009 
1010     byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
1011     this.hbAdmin.createTable(htd,keys);
1012 
1013     LOG.info("Table "+ tableName +" is available!!");
1014   }
1015 
1016   @Override
1017   public int run(String[] args) throws Exception {
1018     if (args.length != 2) {
1019       usage();
1020       return -1;
1021     }
1022 
1023     initialize();
1024 
1025     String dirPath = args[0];
1026     TableName tableName = TableName.valueOf(args[1]);
1027 
1028     boolean tableExists = this.doesTableExist(tableName);
1029     if (!tableExists) {
1030       if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
1031         this.createTable(tableName, dirPath);
1032       } else {
1033         String errorMsg = format("Table '%s' does not exist.", tableName);
1034         LOG.error(errorMsg);
1035         throw new TableNotFoundException(errorMsg);
1036       }
1037     }
1038 
1039     Path hfofDir = new Path(dirPath);
1040 
1041     try (Connection connection = ConnectionFactory.createConnection(getConf());
1042         HTable table = (HTable) connection.getTable(tableName);) {
1043       doBulkLoad(hfofDir, table);
1044     }
1045     return 0;
1046   }
1047 
1048   public static void main(String[] args) throws Exception {
1049     Configuration conf = HBaseConfiguration.create();
1050     int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);
1051     System.exit(ret);
1052   }
1053 
1054 }
1055