1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.mapreduce; 20 21 import static java.lang.String.format; 22 23 import com.google.common.collect.HashMultimap; 24 import com.google.common.collect.Multimap; 25 import com.google.common.collect.Multimaps; 26 import com.google.common.util.concurrent.ThreadFactoryBuilder; 27 28 import org.apache.commons.lang.mutable.MutableInt; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.hadoop.conf.Configuration; 32 import org.apache.hadoop.conf.Configured; 33 import org.apache.hadoop.fs.FileStatus; 34 import org.apache.hadoop.fs.FileSystem; 35 import org.apache.hadoop.fs.Path; 36 import org.apache.hadoop.fs.permission.FsPermission; 37 import org.apache.hadoop.hbase.HBaseConfiguration; 38 import org.apache.hadoop.hbase.HColumnDescriptor; 39 import org.apache.hadoop.hbase.HConstants; 40 import org.apache.hadoop.hbase.HTableDescriptor; 41 import org.apache.hadoop.hbase.KeyValue; 42 import org.apache.hadoop.hbase.KeyValueUtil; 43 import org.apache.hadoop.hbase.TableName; 44 import org.apache.hadoop.hbase.TableNotFoundException; 45 import org.apache.hadoop.hbase.classification.InterfaceAudience; 46 import org.apache.hadoop.hbase.classification.InterfaceStability; 47 import org.apache.hadoop.hbase.client.Admin; 48 import org.apache.hadoop.hbase.client.ClusterConnection; 49 import org.apache.hadoop.hbase.client.Connection; 50 import org.apache.hadoop.hbase.client.ConnectionFactory; 51 import org.apache.hadoop.hbase.client.HBaseAdmin; 52 import org.apache.hadoop.hbase.client.HConnection; 53 import org.apache.hadoop.hbase.client.HTable; 54 import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException; 55 import org.apache.hadoop.hbase.client.RegionLocator; 56 import org.apache.hadoop.hbase.client.RegionServerCallable; 57 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 58 import org.apache.hadoop.hbase.client.Table; 59 import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient; 60 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 61 import org.apache.hadoop.hbase.io.HFileLink; 62 import org.apache.hadoop.hbase.io.HalfStoreFileReader; 63 import org.apache.hadoop.hbase.io.Reference; 64 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 65 import org.apache.hadoop.hbase.io.hfile.CacheConfig; 66 import org.apache.hadoop.hbase.io.hfile.HFile; 67 import org.apache.hadoop.hbase.io.hfile.HFileContext; 68 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 69 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 70 import org.apache.hadoop.hbase.io.hfile.HFileScanner; 71 import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 72 import org.apache.hadoop.hbase.regionserver.BloomType; 73 import org.apache.hadoop.hbase.regionserver.HStore; 74 import org.apache.hadoop.hbase.regionserver.StoreFile; 75 import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 76 import org.apache.hadoop.hbase.security.UserProvider; 77 import org.apache.hadoop.hbase.security.token.FsDelegationToken; 78 import org.apache.hadoop.hbase.util.Bytes; 79 import org.apache.hadoop.hbase.util.FSHDFSUtils; 80 import org.apache.hadoop.hbase.util.Pair; 81 import org.apache.hadoop.util.Tool; 82 import org.apache.hadoop.util.ToolRunner; 83 84 import java.io.FileNotFoundException; 85 import java.io.IOException; 86 import java.io.InterruptedIOException; 87 import java.nio.ByteBuffer; 88 import java.util.ArrayList; 89 import java.util.Arrays; 90 import java.util.Collection; 91 import java.util.Deque; 92 import java.util.HashMap; 93 import java.util.HashSet; 94 import java.util.Iterator; 95 import java.util.LinkedList; 96 import java.util.List; 97 import java.util.Map; 98 import java.util.Map.Entry; 99 import java.util.Set; 100 import java.util.TreeMap; 101 import java.util.UUID; 102 import java.util.concurrent.Callable; 103 import java.util.concurrent.ExecutionException; 104 import java.util.concurrent.ExecutorService; 105 import java.util.concurrent.Future; 106 import java.util.concurrent.LinkedBlockingQueue; 107 import java.util.concurrent.ThreadPoolExecutor; 108 import java.util.concurrent.TimeUnit; 109 110 /** 111 * Tool to load the output of HFileOutputFormat into an existing table. 112 * @see #usage() 113 */ 114 @InterfaceAudience.Public 115 @InterfaceStability.Stable 116 public class LoadIncrementalHFiles extends Configured implements Tool { 117 private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class); 118 private Admin hbAdmin; 119 120 public static final String NAME = "completebulkload"; 121 public static final String MAX_FILES_PER_REGION_PER_FAMILY 122 = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; 123 private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; 124 public final static String CREATE_TABLE_CONF_KEY = "create.table"; 125 126 private int maxFilesPerRegionPerFamily; 127 private boolean assignSeqIds; 128 129 // Source filesystem 130 private FileSystem fs; 131 // Source delegation token 132 private FsDelegationToken fsDelegationToken; 133 private String bulkToken; 134 private UserProvider userProvider; 135 LoadIncrementalHFiles()136 private LoadIncrementalHFiles() {} 137 LoadIncrementalHFiles(Configuration conf)138 public LoadIncrementalHFiles(Configuration conf) throws Exception { 139 super(conf); 140 initialize(); 141 } 142 initialize()143 private void initialize() throws Exception { 144 if (hbAdmin == null) { 145 // make a copy, just to be sure we're not overriding someone else's config 146 setConf(HBaseConfiguration.create(getConf())); 147 Configuration conf = getConf(); 148 // disable blockcache for tool invocation, see HBASE-10500 149 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 150 this.hbAdmin = new HBaseAdmin(conf); 151 this.userProvider = UserProvider.instantiate(conf); 152 this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 153 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); 154 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); 155 } 156 } 157 usage()158 private void usage() { 159 System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" 160 + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" 161 + " Note: if you set this to 'no', then the target table must already exist in HBase\n" 162 + "\n"); 163 } 164 165 private static interface BulkHFileVisitor<TFamily> { bulkFamily(final byte[] familyName)166 TFamily bulkFamily(final byte[] familyName) 167 throws IOException; bulkHFile(final TFamily family, final FileStatus hfileStatus)168 void bulkHFile(final TFamily family, final FileStatus hfileStatus) 169 throws IOException; 170 } 171 172 /** 173 * Iterate over the bulkDir hfiles. 174 * Skip reference, HFileLink, files starting with "_" and non-valid hfiles. 175 */ visitBulkHFiles(final FileSystem fs, final Path bulkDir, final BulkHFileVisitor<TFamily> visitor)176 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, 177 final BulkHFileVisitor<TFamily> visitor) throws IOException { 178 visitBulkHFiles(fs, bulkDir, visitor, true); 179 } 180 181 /** 182 * Iterate over the bulkDir hfiles. 183 * Skip reference, HFileLink, files starting with "_". 184 * Check and skip non-valid hfiles by default, or skip this validation by setting 185 * 'hbase.loadincremental.validate.hfile' to false. 186 */ visitBulkHFiles(final FileSystem fs, final Path bulkDir, final BulkHFileVisitor<TFamily> visitor, final boolean validateHFile)187 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, 188 final BulkHFileVisitor<TFamily> visitor, final boolean validateHFile) throws IOException { 189 if (!fs.exists(bulkDir)) { 190 throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found"); 191 } 192 193 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); 194 if (familyDirStatuses == null) { 195 throw new FileNotFoundException("No families found in " + bulkDir); 196 } 197 198 for (FileStatus familyStat : familyDirStatuses) { 199 if (!familyStat.isDirectory()) { 200 LOG.warn("Skipping non-directory " + familyStat.getPath()); 201 continue; 202 } 203 Path familyDir = familyStat.getPath(); 204 byte[] familyName = familyDir.getName().getBytes(); 205 TFamily family = visitor.bulkFamily(familyName); 206 207 FileStatus[] hfileStatuses = fs.listStatus(familyDir); 208 for (FileStatus hfileStatus : hfileStatuses) { 209 if (!fs.isFile(hfileStatus.getPath())) { 210 LOG.warn("Skipping non-file " + hfileStatus); 211 continue; 212 } 213 214 Path hfile = hfileStatus.getPath(); 215 // Skip "_", reference, HFileLink 216 String fileName = hfile.getName(); 217 if (fileName.startsWith("_")) { 218 continue; 219 } 220 if (StoreFileInfo.isReference(fileName)) { 221 LOG.warn("Skipping reference " + fileName); 222 continue; 223 } 224 if (HFileLink.isHFileLink(fileName)) { 225 LOG.warn("Skipping HFileLink " + fileName); 226 continue; 227 } 228 229 // Validate HFile Format if needed 230 if (validateHFile) { 231 try { 232 if (!HFile.isHFileFormat(fs, hfile)) { 233 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); 234 continue; 235 } 236 } catch (FileNotFoundException e) { 237 LOG.warn("the file " + hfile + " was removed"); 238 continue; 239 } 240 } 241 242 visitor.bulkHFile(family, hfileStatus); 243 } 244 } 245 } 246 247 /** 248 * Represents an HFile waiting to be loaded. An queue is used 249 * in this class in order to support the case where a region has 250 * split during the process of the load. When this happens, 251 * the HFile is split into two physical parts across the new 252 * region boundary, and each part is added back into the queue. 253 * The import process finishes when the queue is empty. 254 */ 255 static class LoadQueueItem { 256 final byte[] family; 257 final Path hfilePath; 258 LoadQueueItem(byte[] family, Path hfilePath)259 public LoadQueueItem(byte[] family, Path hfilePath) { 260 this.family = family; 261 this.hfilePath = hfilePath; 262 } 263 264 @Override toString()265 public String toString() { 266 return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString(); 267 } 268 } 269 270 /** 271 * Walk the given directory for all HFiles, and return a Queue 272 * containing all such files. 273 */ discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, final boolean validateHFile)274 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, 275 final boolean validateHFile) throws IOException { 276 fs = hfofDir.getFileSystem(getConf()); 277 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() { 278 @Override 279 public byte[] bulkFamily(final byte[] familyName) { 280 return familyName; 281 } 282 @Override 283 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { 284 long length = hfile.getLen(); 285 if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, 286 HConstants.DEFAULT_MAX_FILE_SIZE)) { 287 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + 288 length + " bytes can be problematic as it may lead to oversplitting."); 289 } 290 ret.add(new LoadQueueItem(family, hfile.getPath())); 291 } 292 }, validateHFile); 293 } 294 295 /** 296 * Perform a bulk load of the given directory into the given 297 * pre-existing table. This method is not threadsafe. 298 * 299 * @param hfofDir the directory that was provided as the output path 300 * of a job using HFileOutputFormat 301 * @param table the table to load into 302 * @throws TableNotFoundException if table does not yet exist 303 */ 304 @SuppressWarnings("deprecation") doBulkLoad(Path hfofDir, final HTable table)305 public void doBulkLoad(Path hfofDir, final HTable table) 306 throws TableNotFoundException, IOException 307 { 308 Admin admin = null; 309 Table t = table; 310 Connection conn = table.getConnection(); 311 boolean closeConnWhenFinished = false; 312 try { 313 if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) { 314 LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection."); 315 // can only use unmanaged connections from here on out. 316 conn = ConnectionFactory.createConnection(table.getConfiguration()); 317 t = conn.getTable(table.getName()); 318 closeConnWhenFinished = true; 319 if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) { 320 throw new RuntimeException("Failed to create unmanaged connection."); 321 } 322 admin = conn.getAdmin(); 323 } else { 324 admin = conn.getAdmin(); 325 } 326 try (RegionLocator rl = conn.getRegionLocator(t.getName())) { 327 doBulkLoad(hfofDir, admin, t, rl); 328 } 329 } finally { 330 if (admin != null) admin.close(); 331 if (closeConnWhenFinished) { 332 t.close(); 333 conn.close(); 334 } 335 } 336 } 337 338 /** 339 * Perform a bulk load of the given directory into the given 340 * pre-existing table. This method is not threadsafe. 341 * 342 * @param hfofDir the directory that was provided as the output path 343 * of a job using HFileOutputFormat 344 * @param table the table to load into 345 * @throws TableNotFoundException if table does not yet exist 346 */ 347 @SuppressWarnings("deprecation") doBulkLoad(Path hfofDir, final Admin admin, Table table, RegionLocator regionLocator)348 public void doBulkLoad(Path hfofDir, final Admin admin, Table table, 349 RegionLocator regionLocator) throws TableNotFoundException, IOException { 350 351 if (!admin.isTableAvailable(regionLocator.getName())) { 352 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 353 } 354 355 // initialize thread pools 356 int nrThreads = getConf().getInt("hbase.loadincremental.threads.max", 357 Runtime.getRuntime().availableProcessors()); 358 ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); 359 builder.setNameFormat("LoadIncrementalHFiles-%1$d"); 360 ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 361 60, TimeUnit.SECONDS, 362 new LinkedBlockingQueue<Runnable>(), 363 builder.build()); 364 ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); 365 366 // LQI queue does not need to be threadsafe -- all operations on this queue 367 // happen in this thread 368 Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>(); 369 try { 370 /* 371 * Checking hfile format is a time-consuming operation, we should have an option to skip 372 * this step when bulkloading millions of HFiles. See HBASE-13985. 373 */ 374 boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); 375 if(!validateHFile) { 376 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + 377 "are not correct. If you fail to read data from your table after using this " + 378 "option, consider removing the files and bulkload again without this option. " + 379 "See HBASE-13985"); 380 } 381 discoverLoadQueue(queue, hfofDir, validateHFile); 382 // check whether there is invalid family name in HFiles to be bulkloaded 383 Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies(); 384 ArrayList<String> familyNames = new ArrayList<String>(families.size()); 385 for (HColumnDescriptor family : families) { 386 familyNames.add(family.getNameAsString()); 387 } 388 ArrayList<String> unmatchedFamilies = new ArrayList<String>(); 389 Iterator<LoadQueueItem> queueIter = queue.iterator(); 390 while (queueIter.hasNext()) { 391 LoadQueueItem lqi = queueIter.next(); 392 String familyNameInHFile = Bytes.toString(lqi.family); 393 if (!familyNames.contains(familyNameInHFile)) { 394 unmatchedFamilies.add(familyNameInHFile); 395 } 396 } 397 if (unmatchedFamilies.size() > 0) { 398 String msg = 399 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " 400 + unmatchedFamilies + "; valid family names of table " 401 + table.getName() + " are: " + familyNames; 402 LOG.error(msg); 403 throw new IOException(msg); 404 } 405 int count = 0; 406 407 if (queue.isEmpty()) { 408 LOG.warn("Bulk load operation did not find any files to load in " + 409 "directory " + hfofDir.toUri() + ". Does it contain files in " + 410 "subdirectories that correspond to column family names?"); 411 return; 412 } 413 414 //If using secure bulk load, get source delegation token, and 415 //prepare staging directory and token 416 // fs is the source filesystem 417 fsDelegationToken.acquireDelegationToken(fs); 418 if(isSecureBulkLoadEndpointAvailable()) { 419 bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName()); 420 } 421 422 // Assumes that region splits can happen while this occurs. 423 while (!queue.isEmpty()) { 424 // need to reload split keys each iteration. 425 final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); 426 if (count != 0) { 427 LOG.info("Split occured while grouping HFiles, retry attempt " + 428 + count + " with " + queue.size() + " files remaining to group or split"); 429 } 430 431 int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10); 432 maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); 433 if (maxRetries != 0 && count >= maxRetries) { 434 throw new IOException("Retry attempted " + count + 435 " times without completing, bailing out"); 436 } 437 count++; 438 439 // Using ByteBuffer for byte[] equality semantics 440 Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table, 441 pool, queue, startEndKeys); 442 443 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { 444 // Error is logged inside checkHFilesCountPerRegionPerFamily. 445 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily 446 + " hfiles to one family of one region"); 447 } 448 449 bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups); 450 451 // NOTE: The next iteration's split / group could happen in parallel to 452 // atomic bulkloads assuming that there are splits and no merges, and 453 // that we can atomically pull out the groups we want to retry. 454 } 455 456 } finally { 457 fsDelegationToken.releaseDelegationToken(); 458 if(bulkToken != null) { 459 new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken); 460 } 461 pool.shutdown(); 462 if (queue != null && !queue.isEmpty()) { 463 StringBuilder err = new StringBuilder(); 464 err.append("-------------------------------------------------\n"); 465 err.append("Bulk load aborted with some files not yet loaded:\n"); 466 err.append("-------------------------------------------------\n"); 467 for (LoadQueueItem q : queue) { 468 err.append(" ").append(q.hfilePath).append('\n'); 469 } 470 LOG.error(err); 471 } 472 } 473 474 if (queue != null && !queue.isEmpty()) { 475 throw new RuntimeException("Bulk load aborted with some files not yet loaded." 476 + "Please check log for more details."); 477 } 478 } 479 480 /** 481 * This takes the LQI's grouped by likely regions and attempts to bulk load 482 * them. Any failures are re-queued for another pass with the 483 * groupOrSplitPhase. 484 */ bulkLoadPhase(final Table table, final Connection conn, ExecutorService pool, Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups)485 protected void bulkLoadPhase(final Table table, final Connection conn, 486 ExecutorService pool, Deque<LoadQueueItem> queue, 487 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException { 488 // atomically bulk load the groups. 489 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>(); 490 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) { 491 final byte[] first = e.getKey().array(); 492 final Collection<LoadQueueItem> lqis = e.getValue(); 493 494 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { 495 @Override 496 public List<LoadQueueItem> call() throws Exception { 497 List<LoadQueueItem> toRetry = 498 tryAtomicRegionLoad(conn, table.getName(), first, lqis); 499 return toRetry; 500 } 501 }; 502 loadingFutures.add(pool.submit(call)); 503 } 504 505 // get all the results. 506 for (Future<List<LoadQueueItem>> future : loadingFutures) { 507 try { 508 List<LoadQueueItem> toRetry = future.get(); 509 510 // LQIs that are requeued to be regrouped. 511 queue.addAll(toRetry); 512 513 } catch (ExecutionException e1) { 514 Throwable t = e1.getCause(); 515 if (t instanceof IOException) { 516 // At this point something unrecoverable has happened. 517 // TODO Implement bulk load recovery 518 throw new IOException("BulkLoad encountered an unrecoverable problem", t); 519 } 520 LOG.error("Unexpected execution exception during bulk load", e1); 521 throw new IllegalStateException(t); 522 } catch (InterruptedException e1) { 523 LOG.error("Unexpected interrupted exception during bulk load", e1); 524 throw (InterruptedIOException)new InterruptedIOException().initCause(e1); 525 } 526 } 527 } 528 checkHFilesCountPerRegionPerFamily( final Multimap<ByteBuffer, LoadQueueItem> regionGroups)529 private boolean checkHFilesCountPerRegionPerFamily( 530 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { 531 for (Entry<ByteBuffer, 532 ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) { 533 final Collection<LoadQueueItem> lqis = e.getValue(); 534 HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>(); 535 for (LoadQueueItem lqi: lqis) { 536 MutableInt count = filesMap.get(lqi.family); 537 if (count == null) { 538 count = new MutableInt(); 539 filesMap.put(lqi.family, count); 540 } 541 count.increment(); 542 if (count.intValue() > maxFilesPerRegionPerFamily) { 543 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily 544 + " hfiles to family " + Bytes.toStringBinary(lqi.family) 545 + " of region with start key " 546 + Bytes.toStringBinary(e.getKey())); 547 return false; 548 } 549 } 550 } 551 return true; 552 } 553 554 /** 555 * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely 556 * bulk load region targets. 557 */ groupOrSplitPhase(final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, final Pair<byte[][], byte[][]> startEndKeys)558 private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table, 559 ExecutorService pool, Deque<LoadQueueItem> queue, 560 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 561 // <region start key, LQI> need synchronized only within this scope of this 562 // phase because of the puts that happen in futures. 563 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); 564 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); 565 566 // drain LQIs and figure out bulk load groups 567 Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>(); 568 while (!queue.isEmpty()) { 569 final LoadQueueItem item = queue.remove(); 570 571 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { 572 @Override 573 public List<LoadQueueItem> call() throws Exception { 574 List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys); 575 return splits; 576 } 577 }; 578 splittingFutures.add(pool.submit(call)); 579 } 580 // get all the results. All grouping and splitting must finish before 581 // we can attempt the atomic loads. 582 for (Future<List<LoadQueueItem>> lqis : splittingFutures) { 583 try { 584 List<LoadQueueItem> splits = lqis.get(); 585 if (splits != null) { 586 queue.addAll(splits); 587 } 588 } catch (ExecutionException e1) { 589 Throwable t = e1.getCause(); 590 if (t instanceof IOException) { 591 LOG.error("IOException during splitting", e1); 592 throw (IOException)t; // would have been thrown if not parallelized, 593 } 594 LOG.error("Unexpected execution exception during splitting", e1); 595 throw new IllegalStateException(t); 596 } catch (InterruptedException e1) { 597 LOG.error("Unexpected interrupted exception during splitting", e1); 598 throw (InterruptedIOException)new InterruptedIOException().initCause(e1); 599 } 600 } 601 return regionGroups; 602 } 603 604 // unique file name for the table getUniqueName()605 private String getUniqueName() { 606 return UUID.randomUUID().toString().replaceAll("-", ""); 607 } 608 splitStoreFile(final LoadQueueItem item, final Table table, byte[] startKey, byte[] splitKey)609 protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, 610 final Table table, byte[] startKey, 611 byte[] splitKey) throws IOException { 612 final Path hfilePath = item.hfilePath; 613 614 // We use a '_' prefix which is ignored when walking directory trees 615 // above. 616 final String TMP_DIR = "_tmp"; 617 Path tmpDir = item.hfilePath.getParent(); 618 if (!tmpDir.getName().equals(TMP_DIR)) { 619 tmpDir = new Path(tmpDir, TMP_DIR); 620 } 621 622 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + 623 "region. Splitting..."); 624 625 String uniqueName = getUniqueName(); 626 HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family); 627 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); 628 Path topOut = new Path(tmpDir, uniqueName + ".top"); 629 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, 630 botOut, topOut); 631 632 FileSystem fs = tmpDir.getFileSystem(getConf()); 633 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); 634 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); 635 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); 636 637 // Add these back at the *front* of the queue, so there's a lower 638 // chance that the region will just split again before we get there. 639 List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2); 640 lqis.add(new LoadQueueItem(item.family, botOut)); 641 lqis.add(new LoadQueueItem(item.family, topOut)); 642 643 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 644 return lqis; 645 } 646 647 /** 648 * Attempt to assign the given load queue item into its target region group. 649 * If the hfile boundary no longer fits into a region, physically splits 650 * the hfile such that the new bottom half will fit and returns the list of 651 * LQI's corresponding to the resultant hfiles. 652 * 653 * protected for testing 654 * @throws IOException 655 */ groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, final Pair<byte[][], byte[][]> startEndKeys)656 protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, 657 final LoadQueueItem item, final Table table, 658 final Pair<byte[][], byte[][]> startEndKeys) 659 throws IOException { 660 final Path hfilePath = item.hfilePath; 661 HFile.Reader hfr = HFile.createReader(fs, hfilePath, 662 new CacheConfig(getConf()), getConf()); 663 final byte[] first, last; 664 try { 665 hfr.loadFileInfo(); 666 first = hfr.getFirstRowKey(); 667 last = hfr.getLastRowKey(); 668 } finally { 669 hfr.close(); 670 } 671 672 LOG.info("Trying to load hfile=" + hfilePath + 673 " first=" + Bytes.toStringBinary(first) + 674 " last=" + Bytes.toStringBinary(last)); 675 if (first == null || last == null) { 676 assert first == null && last == null; 677 // TODO what if this is due to a bad HFile? 678 LOG.info("hfile " + hfilePath + " has no entries, skipping"); 679 return null; 680 } 681 if (Bytes.compareTo(first, last) > 0) { 682 throw new IllegalArgumentException( 683 "Invalid range: " + Bytes.toStringBinary(first) + 684 " > " + Bytes.toStringBinary(last)); 685 } 686 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, 687 Bytes.BYTES_COMPARATOR); 688 if (idx < 0) { 689 // not on boundary, returns -(insertion index). Calculate region it 690 // would be in. 691 idx = -(idx + 1) - 1; 692 } 693 final int indexForCallable = idx; 694 695 /** 696 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first 697 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next 698 * region. 3) if the endkey of the last region is not empty. 699 */ 700 if (indexForCallable < 0) { 701 throw new IOException("The first region info for table " 702 + table.getName() 703 + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); 704 } else if ((indexForCallable == startEndKeys.getFirst().length - 1) 705 && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { 706 throw new IOException("The last region info for table " 707 + table.getName() 708 + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); 709 } else if (indexForCallable + 1 < startEndKeys.getFirst().length 710 && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], 711 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { 712 throw new IOException("The endkey of one region for table " 713 + table.getName() 714 + " is not equal to the startkey of the next region in hbase:meta." 715 + "Please use hbck tool to fix it first."); 716 } 717 718 boolean lastKeyInRange = 719 Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || 720 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); 721 if (!lastKeyInRange) { 722 List<LoadQueueItem> lqis = splitStoreFile(item, table, 723 startEndKeys.getFirst()[indexForCallable], 724 startEndKeys.getSecond()[indexForCallable]); 725 return lqis; 726 } 727 728 // group regions. 729 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); 730 return null; 731 } 732 733 /** 734 * @deprecated As of release 0.96 735 * (<a href="https://issues.apache.org/jira/browse/HBASE-9508">HBASE-9508</a>). 736 * This will be removed in HBase 2.0.0. 737 * Use {@link #tryAtomicRegionLoad(Connection, TableName, byte[], Collection)}. 738 */ 739 @Deprecated 740 protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn, 741 final byte [] tableName, final byte[] first, Collection<LoadQueueItem> lqis) 742 throws IOException { 743 return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis); 744 } 745 746 /** 747 * Attempts to do an atomic load of many hfiles into a region. If it fails, 748 * it returns a list of hfiles that need to be retried. If it is successful 749 * it will return an empty list. 750 * 751 * NOTE: To maintain row atomicity guarantees, region server callable should 752 * succeed atomically and fails atomically. 753 * 754 * Protected for testing. 755 * 756 * @return empty list if success, list of items to retry on recoverable 757 * failure 758 */ 759 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, 760 final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis) 761 throws IOException { 762 final List<Pair<byte[], String>> famPaths = 763 new ArrayList<Pair<byte[], String>>(lqis.size()); 764 for (LoadQueueItem lqi : lqis) { 765 famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); 766 } 767 768 final RegionServerCallable<Boolean> svrCallable = 769 new RegionServerCallable<Boolean>(conn, tableName, first) { 770 @Override 771 public Boolean call(int callTimeout) throws Exception { 772 SecureBulkLoadClient secureClient = null; 773 boolean success = false; 774 775 try { 776 LOG.debug("Going to connect to server " + getLocation() + " for row " 777 + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); 778 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 779 if (!isSecureBulkLoadEndpointAvailable()) { 780 success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); 781 } else { 782 try (Table table = conn.getTable(getTableName())) { 783 secureClient = new SecureBulkLoadClient(table); 784 success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), 785 bulkToken, getLocation().getRegionInfo().getStartKey()); 786 } 787 } 788 return success; 789 } finally { 790 //Best effort copying of files that might not have been imported 791 //from the staging directory back to original location 792 //in user directory 793 if(secureClient != null && !success) { 794 FileSystem targetFs = FileSystem.get(getConf()); 795 // Check to see if the source and target filesystems are the same 796 // If they are the same filesystem, we will try move the files back 797 // because previously we moved them to the staging directory. 798 if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) { 799 for(Pair<byte[], String> el : famPaths) { 800 Path hfileStagingPath = null; 801 Path hfileOrigPath = new Path(el.getSecond()); 802 try { 803 hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()), 804 hfileOrigPath.getName()); 805 if(targetFs.rename(hfileStagingPath, hfileOrigPath)) { 806 LOG.debug("Moved back file " + hfileOrigPath + " from " + 807 hfileStagingPath); 808 } else if(targetFs.exists(hfileStagingPath)){ 809 LOG.debug("Unable to move back file " + hfileOrigPath + " from " + 810 hfileStagingPath); 811 } 812 } catch(Exception ex) { 813 LOG.debug("Unable to move back file " + hfileOrigPath + " from " + 814 hfileStagingPath, ex); 815 } 816 } 817 } 818 } 819 } 820 } 821 }; 822 823 try { 824 List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>(); 825 Configuration conf = getConf(); 826 boolean success = RpcRetryingCallerFactory.instantiate(conf, 827 null).<Boolean> newCaller() 828 .callWithRetries(svrCallable, Integer.MAX_VALUE); 829 if (!success) { 830 LOG.warn("Attempt to bulk load region containing " 831 + Bytes.toStringBinary(first) + " into table " 832 + tableName + " with files " + lqis 833 + " failed. This is recoverable and they will be retried."); 834 toRetry.addAll(lqis); // return lqi's to retry 835 } 836 // success 837 return toRetry; 838 } catch (IOException e) { 839 LOG.error("Encountered unrecoverable error from region server, additional details: " 840 + svrCallable.getExceptionMessageAdditionalDetail(), e); 841 throw e; 842 } 843 } 844 845 private boolean isSecureBulkLoadEndpointAvailable() { 846 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 847 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); 848 } 849 850 /** 851 * Split a storefile into a top and bottom half, maintaining 852 * the metadata, recreating bloom filters, etc. 853 */ 854 static void splitStoreFile( 855 Configuration conf, Path inFile, 856 HColumnDescriptor familyDesc, byte[] splitKey, 857 Path bottomOut, Path topOut) throws IOException 858 { 859 // Open reader with no block cache, and not in-memory 860 Reference topReference = Reference.createTopReference(splitKey); 861 Reference bottomReference = Reference.createBottomReference(splitKey); 862 863 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); 864 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); 865 } 866 867 /** 868 * Copy half of an HFile into a new HFile. 869 */ 870 private static void copyHFileHalf( 871 Configuration conf, Path inFile, Path outFile, Reference reference, 872 HColumnDescriptor familyDescriptor) 873 throws IOException { 874 FileSystem fs = inFile.getFileSystem(conf); 875 CacheConfig cacheConf = new CacheConfig(conf); 876 HalfStoreFileReader halfReader = null; 877 StoreFile.Writer halfWriter = null; 878 try { 879 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf); 880 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); 881 882 int blocksize = familyDescriptor.getBlocksize(); 883 Algorithm compression = familyDescriptor.getCompression(); 884 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); 885 HFileContext hFileContext = new HFileContextBuilder() 886 .withCompression(compression) 887 .withChecksumType(HStore.getChecksumType(conf)) 888 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) 889 .withBlockSize(blocksize) 890 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()) 891 .withIncludesTags(true) 892 .build(); 893 halfWriter = new StoreFile.WriterBuilder(conf, cacheConf, 894 fs) 895 .withFilePath(outFile) 896 .withBloomType(bloomFilterType) 897 .withFileContext(hFileContext) 898 .build(); 899 HFileScanner scanner = halfReader.getScanner(false, false, false); 900 scanner.seekTo(); 901 do { 902 halfWriter.append(scanner.getKeyValue()); 903 } while (scanner.next()); 904 905 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) { 906 if (shouldCopyHFileMetaKey(entry.getKey())) { 907 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); 908 } 909 } 910 } finally { 911 if (halfWriter != null) halfWriter.close(); 912 if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose()); 913 } 914 } 915 916 private static boolean shouldCopyHFileMetaKey(byte[] key) { 917 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 918 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { 919 return false; 920 } 921 922 return !HFile.isReservedFileInfoKey(key); 923 } 924 925 private boolean doesTableExist(TableName tableName) throws Exception { 926 return hbAdmin.tableExists(tableName); 927 } 928 929 /* 930 * Infers region boundaries for a new table. 931 * Parameter: 932 * bdryMap is a map between keys to an integer belonging to {+1, -1} 933 * If a key is a start key of a file, then it maps to +1 934 * If a key is an end key of a file, then it maps to -1 935 * Algo: 936 * 1) Poll on the keys in order: 937 * a) Keep adding the mapped values to these keys (runningSum) 938 * b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list. 939 * 2) Return the boundary list. 940 */ 941 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) { 942 ArrayList<byte[]> keysArray = new ArrayList<byte[]>(); 943 int runningValue = 0; 944 byte[] currStartKey = null; 945 boolean firstBoundary = true; 946 947 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) { 948 if (runningValue == 0) currStartKey = item.getKey(); 949 runningValue += item.getValue(); 950 if (runningValue == 0) { 951 if (!firstBoundary) keysArray.add(currStartKey); 952 firstBoundary = false; 953 } 954 } 955 956 return keysArray.toArray(new byte[0][0]); 957 } 958 959 /* 960 * If the table is created for the first time, then "completebulkload" reads the files twice. 961 * More modifications necessary if we want to avoid doing it. 962 */ 963 private void createTable(TableName tableName, String dirPath) throws Exception { 964 final Path hfofDir = new Path(dirPath); 965 final FileSystem fs = hfofDir.getFileSystem(getConf()); 966 967 // Add column families 968 // Build a set of keys 969 final HTableDescriptor htd = new HTableDescriptor(tableName); 970 final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 971 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() { 972 @Override 973 public HColumnDescriptor bulkFamily(final byte[] familyName) { 974 HColumnDescriptor hcd = new HColumnDescriptor(familyName); 975 htd.addFamily(hcd); 976 return hcd; 977 } 978 @Override 979 public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus) 980 throws IOException { 981 Path hfile = hfileStatus.getPath(); 982 HFile.Reader reader = HFile.createReader(fs, hfile, 983 new CacheConfig(getConf()), getConf()); 984 try { 985 if (hcd.getCompressionType() != reader.getFileContext().getCompression()) { 986 hcd.setCompressionType(reader.getFileContext().getCompression()); 987 LOG.info("Setting compression " + hcd.getCompressionType().name() + 988 " for family " + hcd.toString()); 989 } 990 reader.loadFileInfo(); 991 byte[] first = reader.getFirstRowKey(); 992 byte[] last = reader.getLastRowKey(); 993 994 LOG.info("Trying to figure out region boundaries hfile=" + hfile + 995 " first=" + Bytes.toStringBinary(first) + 996 " last=" + Bytes.toStringBinary(last)); 997 998 // To eventually infer start key-end key boundaries 999 Integer value = map.containsKey(first)? map.get(first):0; 1000 map.put(first, value+1); 1001 1002 value = map.containsKey(last)? map.get(last):0; 1003 map.put(last, value-1); 1004 } finally { 1005 reader.close(); 1006 } 1007 } 1008 }); 1009 1010 byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map); 1011 this.hbAdmin.createTable(htd,keys); 1012 1013 LOG.info("Table "+ tableName +" is available!!"); 1014 } 1015 1016 @Override 1017 public int run(String[] args) throws Exception { 1018 if (args.length != 2) { 1019 usage(); 1020 return -1; 1021 } 1022 1023 initialize(); 1024 1025 String dirPath = args[0]; 1026 TableName tableName = TableName.valueOf(args[1]); 1027 1028 boolean tableExists = this.doesTableExist(tableName); 1029 if (!tableExists) { 1030 if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) { 1031 this.createTable(tableName, dirPath); 1032 } else { 1033 String errorMsg = format("Table '%s' does not exist.", tableName); 1034 LOG.error(errorMsg); 1035 throw new TableNotFoundException(errorMsg); 1036 } 1037 } 1038 1039 Path hfofDir = new Path(dirPath); 1040 1041 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1042 HTable table = (HTable) connection.getTable(tableName);) { 1043 doBulkLoad(hfofDir, table); 1044 } 1045 return 0; 1046 } 1047 1048 public static void main(String[] args) throws Exception { 1049 Configuration conf = HBaseConfiguration.create(); 1050 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args); 1051 System.exit(ret); 1052 } 1053 1054 } 1055