1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.regionserver; 20 21 import java.io.IOException; 22 import java.io.InterruptedIOException; 23 import java.security.PrivilegedExceptionAction; 24 import java.util.ArrayList; 25 import java.util.List; 26 import java.util.ListIterator; 27 import java.util.Map; 28 import java.util.concurrent.Callable; 29 import java.util.concurrent.ExecutionException; 30 import java.util.concurrent.Executors; 31 import java.util.concurrent.Future; 32 import java.util.concurrent.ThreadFactory; 33 import java.util.concurrent.ThreadPoolExecutor; 34 import java.util.concurrent.TimeUnit; 35 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 import org.apache.hadoop.fs.Path; 39 import org.apache.hadoop.hbase.classification.InterfaceAudience; 40 import org.apache.hadoop.hbase.HConstants; 41 import org.apache.hadoop.hbase.HRegionInfo; 42 import org.apache.hadoop.hbase.Server; 43 import org.apache.hadoop.hbase.ServerName; 44 import org.apache.hadoop.hbase.MetaTableAccessor; 45 import org.apache.hadoop.hbase.client.HConnection; 46 import org.apache.hadoop.hbase.client.Mutation; 47 import org.apache.hadoop.hbase.client.Put; 48 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; 49 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination; 50 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 51 import org.apache.hadoop.hbase.security.User; 52 import org.apache.hadoop.hbase.util.Bytes; 53 import org.apache.hadoop.hbase.util.CancelableProgressable; 54 import org.apache.hadoop.hbase.util.ConfigUtil; 55 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 56 import org.apache.hadoop.hbase.util.FSUtils; 57 import org.apache.hadoop.hbase.util.HasThread; 58 import org.apache.hadoop.hbase.util.Pair; 59 import org.apache.hadoop.hbase.util.PairOfSameType; 60 import org.apache.zookeeper.KeeperException; 61 62 import com.google.common.util.concurrent.ThreadFactoryBuilder; 63 64 @InterfaceAudience.Private 65 public class SplitTransactionImpl implements SplitTransaction { 66 private static final Log LOG = LogFactory.getLog(SplitTransaction.class); 67 68 /* 69 * Region to split 70 */ 71 private final HRegion parent; 72 private HRegionInfo hri_a; 73 private HRegionInfo hri_b; 74 private long fileSplitTimeout = 30000; 75 public SplitTransactionCoordination.SplitTransactionDetails std; 76 boolean useZKForAssignment; 77 78 /* 79 * Row to split around 80 */ 81 private final byte [] splitrow; 82 83 /* 84 * Transaction state for listener, only valid during execute and 85 * rollback 86 */ 87 private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED; 88 private Server server; 89 private RegionServerServices rsServices; 90 91 public static class JournalEntryImpl implements JournalEntry { 92 private SplitTransactionPhase type; 93 private long timestamp; 94 JournalEntryImpl(SplitTransactionPhase type)95 public JournalEntryImpl(SplitTransactionPhase type) { 96 this(type, EnvironmentEdgeManager.currentTime()); 97 } 98 JournalEntryImpl(SplitTransactionPhase type, long timestamp)99 public JournalEntryImpl(SplitTransactionPhase type, long timestamp) { 100 this.type = type; 101 this.timestamp = timestamp; 102 } 103 104 @Override toString()105 public String toString() { 106 StringBuilder sb = new StringBuilder(); 107 sb.append(type); 108 sb.append(" at "); 109 sb.append(timestamp); 110 return sb.toString(); 111 } 112 113 @Override getPhase()114 public SplitTransactionPhase getPhase() { 115 return type; 116 } 117 118 @Override getTimeStamp()119 public long getTimeStamp() { 120 return timestamp; 121 } 122 } 123 124 /* 125 * Journal of how far the split transaction has progressed. 126 */ 127 private final List<JournalEntry> journal = new ArrayList<JournalEntry>(); 128 129 /** 130 * Listeners 131 */ 132 private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>(); 133 134 /** 135 * Constructor 136 * @param r Region to split 137 * @param splitrow Row to split around 138 */ SplitTransactionImpl(final Region r, final byte [] splitrow)139 public SplitTransactionImpl(final Region r, final byte [] splitrow) { 140 this.parent = (HRegion)r; 141 this.splitrow = splitrow; 142 this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED)); 143 useZKForAssignment = ConfigUtil.useZKForAssignment(parent.getBaseConf()); 144 } 145 transition(SplitTransactionPhase nextPhase)146 private void transition(SplitTransactionPhase nextPhase) throws IOException { 147 transition(nextPhase, false); 148 } 149 transition(SplitTransactionPhase nextPhase, boolean isRollback)150 private void transition(SplitTransactionPhase nextPhase, boolean isRollback) 151 throws IOException { 152 if (!isRollback) { 153 // Add to the journal first, because if the listener throws an exception 154 // we need to roll back starting at 'nextPhase' 155 this.journal.add(new JournalEntryImpl(nextPhase)); 156 } 157 for (int i = 0; i < listeners.size(); i++) { 158 TransactionListener listener = listeners.get(i); 159 if (!isRollback) { 160 listener.transition(this, currentPhase, nextPhase); 161 } else { 162 listener.rollback(this, currentPhase, nextPhase); 163 } 164 } 165 currentPhase = nextPhase; 166 } 167 168 /** 169 * Does checks on split inputs. 170 * @return <code>true</code> if the region is splittable else 171 * <code>false</code> if it is not (e.g. its already closed, etc.). 172 */ prepare()173 public boolean prepare() throws IOException { 174 if (!this.parent.isSplittable()) return false; 175 // Split key can be null if this region is unsplittable; i.e. has refs. 176 if (this.splitrow == null) return false; 177 HRegionInfo hri = this.parent.getRegionInfo(); 178 parent.prepareToSplit(); 179 // Check splitrow. 180 byte [] startKey = hri.getStartKey(); 181 byte [] endKey = hri.getEndKey(); 182 if (Bytes.equals(startKey, splitrow) || 183 !this.parent.getRegionInfo().containsRow(splitrow)) { 184 LOG.info("Split row is not inside region key range or is equal to " + 185 "startkey: " + Bytes.toStringBinary(this.splitrow)); 186 return false; 187 } 188 long rid = getDaughterRegionIdTimestamp(hri); 189 this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid); 190 this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid); 191 192 transition(SplitTransactionPhase.PREPARED); 193 194 return true; 195 } 196 197 /** 198 * Calculate daughter regionid to use. 199 * @param hri Parent {@link HRegionInfo} 200 * @return Daughter region id (timestamp) to use. 201 */ getDaughterRegionIdTimestamp(final HRegionInfo hri)202 private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) { 203 long rid = EnvironmentEdgeManager.currentTime(); 204 // Regionid is timestamp. Can't be less than that of parent else will insert 205 // at wrong location in hbase:meta (See HBASE-710). 206 if (rid < hri.getRegionId()) { 207 LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() + 208 " but current time here is " + rid); 209 rid = hri.getRegionId() + 1; 210 } 211 return rid; 212 } 213 214 private static IOException closedByOtherException = new IOException( 215 "Failed to close region: already closed by another thread"); 216 createDaughters(final Server server, final RegionServerServices services)217 /* package */PairOfSameType<Region> createDaughters(final Server server, 218 final RegionServerServices services) throws IOException { 219 return createDaughters(server, services, null); 220 } 221 222 /** 223 * Prepare the regions and region files. 224 * @param server Hosting server instance. Can be null when testing (won't try 225 * and update in zk if a null server) 226 * @param services Used to online/offline regions. 227 * @param user 228 * @throws IOException If thrown, transaction failed. 229 * Call {@link #rollback(Server, RegionServerServices)} 230 * @return Regions created 231 */ createDaughters(final Server server, final RegionServerServices services, User user)232 /* package */PairOfSameType<Region> createDaughters(final Server server, 233 final RegionServerServices services, User user) throws IOException { 234 LOG.info("Starting split of region " + this.parent); 235 if ((server != null && server.isStopped()) || 236 (services != null && services.isStopping())) { 237 throw new IOException("Server is stopped or stopping"); 238 } 239 assert !this.parent.lock.writeLock().isHeldByCurrentThread(): 240 "Unsafe to hold write lock while performing RPCs"; 241 242 transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK); 243 244 // Coprocessor callback 245 if (this.parent.getCoprocessorHost() != null) { 246 if (user == null) { 247 // TODO: Remove one of these 248 parent.getCoprocessorHost().preSplit(); 249 parent.getCoprocessorHost().preSplit(splitrow); 250 } else { 251 try { 252 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { 253 @Override 254 public Void run() throws Exception { 255 parent.getCoprocessorHost().preSplit(); 256 parent.getCoprocessorHost().preSplit(splitrow); 257 return null; 258 } 259 }); 260 } catch (InterruptedException ie) { 261 InterruptedIOException iioe = new InterruptedIOException(); 262 iioe.initCause(ie); 263 throw iioe; 264 } 265 } 266 } 267 268 transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK); 269 270 // If true, no cluster to write meta edits to or to update znodes in. 271 boolean testing = server == null? true: 272 server.getConfiguration().getBoolean("hbase.testing.nocluster", false); 273 this.fileSplitTimeout = testing ? this.fileSplitTimeout : 274 server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout", 275 this.fileSplitTimeout); 276 277 PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing); 278 279 final List<Mutation> metaEntries = new ArrayList<Mutation>(); 280 boolean ret = false; 281 if (this.parent.getCoprocessorHost() != null) { 282 if (user == null) { 283 ret = parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries); 284 } else { 285 try { 286 ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() { 287 @Override 288 public Boolean run() throws Exception { 289 return parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries); 290 } 291 }); 292 } catch (InterruptedException ie) { 293 InterruptedIOException iioe = new InterruptedIOException(); 294 iioe.initCause(ie); 295 throw iioe; 296 } 297 } 298 if (ret) { 299 throw new IOException("Coprocessor bypassing region " 300 + this.parent.getRegionInfo().getRegionNameAsString() + " split."); 301 } 302 try { 303 for (Mutation p : metaEntries) { 304 HRegionInfo.parseRegionName(p.getRow()); 305 } 306 } catch (IOException e) { 307 LOG.error("Row key of mutation from coprossor is not parsable as region name." 308 + "Mutations from coprocessor should only for hbase:meta table."); 309 throw e; 310 } 311 } 312 313 // This is the point of no return. Adding subsequent edits to .META. as we 314 // do below when we do the daughter opens adding each to .META. can fail in 315 // various interesting ways the most interesting of which is a timeout 316 // BUT the edits all go through (See HBASE-3872). IF we reach the PONR 317 // then subsequent failures need to crash out this regionserver; the 318 // server shutdown processing should be able to fix-up the incomplete split. 319 // The offlined parent will have the daughters as extra columns. If 320 // we leave the daughter regions in place and do not remove them when we 321 // crash out, then they will have their references to the parent in place 322 // still and the server shutdown fixup of .META. will point to these 323 // regions. 324 // We should add PONR JournalEntry before offlineParentInMeta,so even if 325 // OfflineParentInMeta timeout,this will cause regionserver exit,and then 326 // master ServerShutdownHandler will fix daughter & avoid data loss. (See 327 // HBase-4562). 328 transition(SplitTransactionPhase.PONR); 329 330 // Edit parent in meta. Offlines parent region and adds splita and splitb 331 // as an atomic update. See HBASE-7721. This update to META makes the region 332 // will determine whether the region is split or not in case of failures. 333 // If it is successful, master will roll-forward, if not, master will rollback 334 // and assign the parent region. 335 if (!testing && useZKForAssignment) { 336 if (metaEntries == null || metaEntries.isEmpty()) { 337 MetaTableAccessor.splitRegion(server.getConnection(), 338 parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), 339 daughterRegions.getSecond().getRegionInfo(), server.getServerName(), 340 parent.getTableDesc().getRegionReplication()); 341 } else { 342 offlineParentInMetaAndputMetaEntries(server.getConnection(), 343 parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions 344 .getSecond().getRegionInfo(), server.getServerName(), metaEntries, 345 parent.getTableDesc().getRegionReplication()); 346 } 347 } else if (services != null && !useZKForAssignment) { 348 if (!services.reportRegionStateTransition(TransitionCode.SPLIT_PONR, 349 parent.getRegionInfo(), hri_a, hri_b)) { 350 // Passed PONR, let SSH clean it up 351 throw new IOException("Failed to notify master that split passed PONR: " 352 + parent.getRegionInfo().getRegionNameAsString()); 353 } 354 } 355 return daughterRegions; 356 } 357 stepsBeforePONR(final Server server, final RegionServerServices services, boolean testing)358 public PairOfSameType<Region> stepsBeforePONR(final Server server, 359 final RegionServerServices services, boolean testing) throws IOException { 360 361 if (useCoordinatedStateManager(server)) { 362 if (std == null) { 363 std = 364 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 365 .getSplitTransactionCoordination().getDefaultDetails(); 366 } 367 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 368 .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(), 369 hri_a, hri_b); 370 } else if (services != null && !useZKForAssignment) { 371 if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, 372 parent.getRegionInfo(), hri_a, hri_b)) { 373 throw new IOException("Failed to get ok from master to split " 374 + parent.getRegionInfo().getRegionNameAsString()); 375 } 376 } 377 378 transition(SplitTransactionPhase.SET_SPLITTING); 379 380 if (useCoordinatedStateManager(server)) { 381 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 382 .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a, 383 hri_b, std); 384 } 385 386 this.parent.getRegionFileSystem().createSplitsDir(); 387 388 transition(SplitTransactionPhase.CREATE_SPLIT_DIR); 389 390 Map<byte[], List<StoreFile>> hstoreFilesToSplit = null; 391 Exception exceptionToThrow = null; 392 try{ 393 hstoreFilesToSplit = this.parent.close(false); 394 } catch (Exception e) { 395 exceptionToThrow = e; 396 } 397 if (exceptionToThrow == null && hstoreFilesToSplit == null) { 398 // The region was closed by a concurrent thread. We can't continue 399 // with the split, instead we must just abandon the split. If we 400 // reopen or split this could cause problems because the region has 401 // probably already been moved to a different server, or is in the 402 // process of moving to a different server. 403 exceptionToThrow = closedByOtherException; 404 } 405 if (exceptionToThrow != closedByOtherException) { 406 transition(SplitTransactionPhase.CLOSED_PARENT_REGION); 407 } 408 if (exceptionToThrow != null) { 409 if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow; 410 throw new IOException(exceptionToThrow); 411 } 412 if (!testing) { 413 services.removeFromOnlineRegions(this.parent, null); 414 } 415 416 transition(SplitTransactionPhase.OFFLINED_PARENT); 417 418 // TODO: If splitStoreFiles were multithreaded would we complete steps in 419 // less elapsed time? St.Ack 20100920 420 // 421 // splitStoreFiles creates daughter region dirs under the parent splits dir 422 // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will 423 // clean this up. 424 Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit); 425 426 // Log to the journal that we are creating region A, the first daughter 427 // region. We could fail halfway through. If we do, we could have left 428 // stuff in fs that needs cleanup -- a storefile or two. Thats why we 429 // add entry to journal BEFORE rather than AFTER the change. 430 transition(SplitTransactionPhase.STARTED_REGION_A_CREATION); 431 432 assertReferenceFileCount(expectedReferences.getFirst(), 433 this.parent.getRegionFileSystem().getSplitsDir(this.hri_a)); 434 Region a = this.parent.createDaughterRegionFromSplits(this.hri_a); 435 assertReferenceFileCount(expectedReferences.getFirst(), 436 new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName())); 437 438 // Ditto 439 transition(SplitTransactionPhase.STARTED_REGION_B_CREATION); 440 441 assertReferenceFileCount(expectedReferences.getSecond(), 442 this.parent.getRegionFileSystem().getSplitsDir(this.hri_b)); 443 Region b = this.parent.createDaughterRegionFromSplits(this.hri_b); 444 assertReferenceFileCount(expectedReferences.getSecond(), 445 new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName())); 446 447 return new PairOfSameType<Region>(a, b); 448 } 449 assertReferenceFileCount(int expectedReferenceFileCount, Path dir)450 void assertReferenceFileCount(int expectedReferenceFileCount, Path dir) 451 throws IOException { 452 if (expectedReferenceFileCount != 0 && 453 expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(), 454 dir)) { 455 throw new IOException("Failing split. Expected reference file count isn't equal."); 456 } 457 } 458 459 /** 460 * Perform time consuming opening of the daughter regions. 461 * @param server Hosting server instance. Can be null when testing 462 * @param services Used to online/offline regions. 463 * @param a first daughter region 464 * @param a second daughter region 465 * @throws IOException If thrown, transaction failed. 466 * Call {@link #rollback(Server, RegionServerServices)} 467 */ openDaughters(final Server server, final RegionServerServices services, Region a, Region b)468 /* package */void openDaughters(final Server server, 469 final RegionServerServices services, Region a, Region b) 470 throws IOException { 471 boolean stopped = server != null && server.isStopped(); 472 boolean stopping = services != null && services.isStopping(); 473 // TODO: Is this check needed here? 474 if (stopped || stopping) { 475 LOG.info("Not opening daughters " + 476 b.getRegionInfo().getRegionNameAsString() + 477 " and " + 478 a.getRegionInfo().getRegionNameAsString() + 479 " because stopping=" + stopping + ", stopped=" + stopped); 480 } else { 481 // Open daughters in parallel. 482 DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a); 483 DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b); 484 aOpener.start(); 485 bOpener.start(); 486 try { 487 aOpener.join(); 488 if (aOpener.getException() == null) { 489 transition(SplitTransactionPhase.OPENED_REGION_A); 490 } 491 bOpener.join(); 492 if (bOpener.getException() == null) { 493 transition(SplitTransactionPhase.OPENED_REGION_B); 494 } 495 } catch (InterruptedException e) { 496 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 497 } 498 if (aOpener.getException() != null) { 499 throw new IOException("Failed " + 500 aOpener.getName(), aOpener.getException()); 501 } 502 if (bOpener.getException() != null) { 503 throw new IOException("Failed " + 504 bOpener.getName(), bOpener.getException()); 505 } 506 if (services != null) { 507 try { 508 if (useZKForAssignment) { 509 // add 2nd daughter first (see HBASE-4335) 510 services.postOpenDeployTasks(b); 511 } else if (!services.reportRegionStateTransition(TransitionCode.SPLIT, 512 parent.getRegionInfo(), hri_a, hri_b)) { 513 throw new IOException("Failed to report split region to master: " 514 + parent.getRegionInfo().getShortNameToLog()); 515 } 516 // Should add it to OnlineRegions 517 services.addToOnlineRegions(b); 518 if (useZKForAssignment) { 519 services.postOpenDeployTasks(a); 520 } 521 services.addToOnlineRegions(a); 522 } catch (KeeperException ke) { 523 throw new IOException(ke); 524 } 525 } 526 } 527 } 528 execute(final Server server, final RegionServerServices services)529 public PairOfSameType<Region> execute(final Server server, 530 final RegionServerServices services) 531 throws IOException { 532 if (User.isHBaseSecurityEnabled(parent.getBaseConf())) { 533 LOG.warn("Should use execute(Server, RegionServerServices, User)"); 534 } 535 return execute(server, services, null); 536 } 537 538 /** 539 * Run the transaction. 540 * @param server Hosting server instance. Can be null when testing 541 * @param services Used to online/offline regions. 542 * @throws IOException If thrown, transaction failed. 543 * Call {@link #rollback(Server, RegionServerServices)} 544 * @return Regions created 545 * @throws IOException 546 * @see #rollback(Server, RegionServerServices) 547 */ 548 @Override execute(final Server server, final RegionServerServices services, User user)549 public PairOfSameType<Region> execute(final Server server, 550 final RegionServerServices services, User user) throws IOException { 551 this.server = server; 552 this.rsServices = services; 553 useZKForAssignment = server == null ? true : 554 ConfigUtil.useZKForAssignment(server.getConfiguration()); 555 if (useCoordinatedStateManager(server)) { 556 std = 557 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 558 .getSplitTransactionCoordination().getDefaultDetails(); 559 } 560 PairOfSameType<Region> regions = createDaughters(server, services, user); 561 if (this.parent.getCoprocessorHost() != null) { 562 if (user == null) { 563 parent.getCoprocessorHost().preSplitAfterPONR(); 564 } else { 565 try { 566 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { 567 @Override 568 public Void run() throws Exception { 569 parent.getCoprocessorHost().preSplitAfterPONR(); 570 return null; 571 } 572 }); 573 } catch (InterruptedException ie) { 574 InterruptedIOException iioe = new InterruptedIOException(); 575 iioe.initCause(ie); 576 throw iioe; 577 } 578 } 579 } 580 regions = stepsAfterPONR(server, services, regions, user); 581 582 transition(SplitTransactionPhase.COMPLETED); 583 584 return regions; 585 } 586 587 @Deprecated stepsAfterPONR(final Server server, final RegionServerServices services, final PairOfSameType<Region> regions)588 public PairOfSameType<Region> stepsAfterPONR(final Server server, 589 final RegionServerServices services, final PairOfSameType<Region> regions) 590 throws IOException { 591 return stepsAfterPONR(server, services, regions, null); 592 } 593 stepsAfterPONR(final Server server, final RegionServerServices services, final PairOfSameType<Region> regions, User user)594 public PairOfSameType<Region> stepsAfterPONR(final Server server, 595 final RegionServerServices services, final PairOfSameType<Region> regions, User user) 596 throws IOException { 597 openDaughters(server, services, regions.getFirst(), regions.getSecond()); 598 if (useCoordinatedStateManager(server)) { 599 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 600 .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(), 601 regions.getSecond(), std, parent); 602 } 603 604 transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK); 605 606 // Coprocessor callback 607 if (parent.getCoprocessorHost() != null) { 608 if (user == null) { 609 this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond()); 610 } else { 611 try { 612 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { 613 @Override 614 public Void run() throws Exception { 615 parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond()); 616 return null; 617 } 618 }); 619 } catch (InterruptedException ie) { 620 InterruptedIOException iioe = new InterruptedIOException(); 621 iioe.initCause(ie); 622 throw iioe; 623 } 624 } 625 } 626 627 transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK); 628 629 return regions; 630 } 631 offlineParentInMetaAndputMetaEntries(HConnection hConnection, HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB, ServerName serverName, List<Mutation> metaEntries, int regionReplication)632 private void offlineParentInMetaAndputMetaEntries(HConnection hConnection, 633 HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB, 634 ServerName serverName, List<Mutation> metaEntries, int regionReplication) 635 throws IOException { 636 List<Mutation> mutations = metaEntries; 637 HRegionInfo copyOfParent = new HRegionInfo(parent); 638 copyOfParent.setOffline(true); 639 copyOfParent.setSplit(true); 640 641 //Put for parent 642 Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent); 643 MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB); 644 mutations.add(putParent); 645 646 //Puts for daughters 647 Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA); 648 Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB); 649 650 addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine. 651 addLocation(putB, serverName, 1); 652 mutations.add(putA); 653 mutations.add(putB); 654 655 // Add empty locations for region replicas of daughters so that number of replicas can be 656 // cached whenever the primary region is looked up from meta 657 for (int i = 1; i < regionReplication; i++) { 658 addEmptyLocation(putA, i); 659 addEmptyLocation(putB, i); 660 } 661 662 MetaTableAccessor.mutateMetaTable(hConnection, mutations); 663 } 664 addEmptyLocation(final Put p, int replicaId)665 private static Put addEmptyLocation(final Put p, int replicaId){ 666 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null); 667 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId), 668 null); 669 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null); 670 return p; 671 } 672 addLocation(final Put p, final ServerName sn, long openSeqNum)673 public Put addLocation(final Put p, final ServerName sn, long openSeqNum) { 674 p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, 675 Bytes.toBytes(sn.getHostAndPort())); 676 p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, 677 Bytes.toBytes(sn.getStartcode())); 678 p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, 679 Bytes.toBytes(openSeqNum)); 680 return p; 681 } 682 683 /* 684 * Open daughter region in its own thread. 685 * If we fail, abort this hosting server. 686 */ 687 class DaughterOpener extends HasThread { 688 private final Server server; 689 private final HRegion r; 690 private Throwable t = null; 691 DaughterOpener(final Server s, final HRegion r)692 DaughterOpener(final Server s, final HRegion r) { 693 super((s == null? "null-services": s.getServerName()) + 694 "-daughterOpener=" + r.getRegionInfo().getEncodedName()); 695 setDaemon(true); 696 this.server = s; 697 this.r = r; 698 } 699 700 /** 701 * @return Null if open succeeded else exception that causes us fail open. 702 * Call it after this thread exits else you may get wrong view on result. 703 */ getException()704 Throwable getException() { 705 return this.t; 706 } 707 708 @Override run()709 public void run() { 710 try { 711 openDaughterRegion(this.server, r); 712 } catch (Throwable t) { 713 this.t = t; 714 } 715 } 716 } 717 718 /** 719 * Open daughter regions, add them to online list and update meta. 720 * @param server 721 * @param daughter 722 * @throws IOException 723 * @throws KeeperException 724 */ openDaughterRegion(final Server server, final HRegion daughter)725 void openDaughterRegion(final Server server, final HRegion daughter) 726 throws IOException, KeeperException { 727 HRegionInfo hri = daughter.getRegionInfo(); 728 LoggingProgressable reporter = server == null ? null 729 : new LoggingProgressable(hri, server.getConfiguration().getLong( 730 "hbase.regionserver.split.daughter.open.log.interval", 10000)); 731 daughter.openHRegion(reporter); 732 } 733 734 static class LoggingProgressable implements CancelableProgressable { 735 private final HRegionInfo hri; 736 private long lastLog = -1; 737 private final long interval; 738 LoggingProgressable(final HRegionInfo hri, final long interval)739 LoggingProgressable(final HRegionInfo hri, final long interval) { 740 this.hri = hri; 741 this.interval = interval; 742 } 743 744 @Override progress()745 public boolean progress() { 746 long now = EnvironmentEdgeManager.currentTime(); 747 if (now - lastLog > this.interval) { 748 LOG.info("Opening " + this.hri.getRegionNameAsString()); 749 this.lastLog = now; 750 } 751 return true; 752 } 753 } 754 useCoordinatedStateManager(final Server server)755 private boolean useCoordinatedStateManager(final Server server) { 756 return server != null && useZKForAssignment && server.getCoordinatedStateManager() != null; 757 } 758 759 /** 760 * Creates reference files for top and bottom half of the 761 * @param hstoreFilesToSplit map of store files to create half file references for. 762 * @return the number of reference files that were created. 763 * @throws IOException 764 */ splitStoreFiles( final Map<byte[], List<StoreFile>> hstoreFilesToSplit)765 private Pair<Integer, Integer> splitStoreFiles( 766 final Map<byte[], List<StoreFile>> hstoreFilesToSplit) 767 throws IOException { 768 if (hstoreFilesToSplit == null) { 769 // Could be null because close didn't succeed -- for now consider it fatal 770 throw new IOException("Close returned empty list of StoreFiles"); 771 } 772 // The following code sets up a thread pool executor with as many slots as 773 // there's files to split. It then fires up everything, waits for 774 // completion and finally checks for any exception 775 int nbFiles = 0; 776 for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) { 777 nbFiles += entry.getValue().size(); 778 } 779 if (nbFiles == 0) { 780 // no file needs to be splitted. 781 return new Pair<Integer, Integer>(0,0); 782 } 783 // Default max #threads to use is the smaller of table's configured number of blocking store 784 // files or the available number of logical cores. 785 int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY, 786 HStore.DEFAULT_BLOCKING_STOREFILE_COUNT), 787 Runtime.getRuntime().availableProcessors()); 788 // Max #threads is the smaller of the number of storefiles or the default max determined above. 789 int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, 790 defMaxThreads), nbFiles); 791 LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent + 792 " using " + maxThreads + " threads"); 793 ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); 794 builder.setNameFormat("StoreFileSplitter-%1$d"); 795 ThreadFactory factory = builder.build(); 796 ThreadPoolExecutor threadPool = 797 (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory); 798 List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles); 799 800 // Split each store file. 801 for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) { 802 for (StoreFile sf: entry.getValue()) { 803 StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf); 804 futures.add(threadPool.submit(sfs)); 805 } 806 } 807 // Shutdown the pool 808 threadPool.shutdown(); 809 810 // Wait for all the tasks to finish 811 try { 812 boolean stillRunning = !threadPool.awaitTermination( 813 this.fileSplitTimeout, TimeUnit.MILLISECONDS); 814 if (stillRunning) { 815 threadPool.shutdownNow(); 816 // wait for the thread to shutdown completely. 817 while (!threadPool.isTerminated()) { 818 Thread.sleep(50); 819 } 820 throw new IOException("Took too long to split the" + 821 " files and create the references, aborting split"); 822 } 823 } catch (InterruptedException e) { 824 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 825 } 826 827 int created_a = 0; 828 int created_b = 0; 829 // Look for any exception 830 for (Future<Pair<Path, Path>> future : futures) { 831 try { 832 Pair<Path, Path> p = future.get(); 833 created_a += p.getFirst() != null ? 1 : 0; 834 created_b += p.getSecond() != null ? 1 : 0; 835 } catch (InterruptedException e) { 836 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 837 } catch (ExecutionException e) { 838 throw new IOException(e); 839 } 840 } 841 842 if (LOG.isDebugEnabled()) { 843 LOG.debug("Split storefiles for region " + this.parent + " Daughter A: " + created_a 844 + " storefiles, Daughter B: " + created_b + " storefiles."); 845 } 846 return new Pair<Integer, Integer>(created_a, created_b); 847 } 848 splitStoreFile(final byte[] family, final StoreFile sf)849 private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf) 850 throws IOException { 851 if (LOG.isDebugEnabled()) { 852 LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " + 853 this.parent); 854 } 855 HRegionFileSystem fs = this.parent.getRegionFileSystem(); 856 String familyName = Bytes.toString(family); 857 858 Path path_a = 859 fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false, 860 this.parent.getSplitPolicy()); 861 Path path_b = 862 fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true, 863 this.parent.getSplitPolicy()); 864 if (LOG.isDebugEnabled()) { 865 LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " + 866 this.parent); 867 } 868 return new Pair<Path,Path>(path_a, path_b); 869 } 870 871 /** 872 * Utility class used to do the file splitting / reference writing 873 * in parallel instead of sequentially. 874 */ 875 class StoreFileSplitter implements Callable<Pair<Path,Path>> { 876 private final byte[] family; 877 private final StoreFile sf; 878 879 /** 880 * Constructor that takes what it needs to split 881 * @param family Family that contains the store file 882 * @param sf which file 883 */ StoreFileSplitter(final byte[] family, final StoreFile sf)884 public StoreFileSplitter(final byte[] family, final StoreFile sf) { 885 this.sf = sf; 886 this.family = family; 887 } 888 call()889 public Pair<Path,Path> call() throws IOException { 890 return splitStoreFile(family, sf); 891 } 892 } 893 894 @Override rollback(final Server server, final RegionServerServices services)895 public boolean rollback(final Server server, final RegionServerServices services) 896 throws IOException { 897 if (User.isHBaseSecurityEnabled(parent.getBaseConf())) { 898 LOG.warn("Should use rollback(Server, RegionServerServices, User)"); 899 } 900 return rollback(server, services, null); 901 } 902 903 /** 904 * @param server Hosting server instance (May be null when testing). 905 * @param services 906 * @throws IOException If thrown, rollback failed. Take drastic action. 907 * @return True if we successfully rolled back, false if we got to the point 908 * of no return and so now need to abort the server to minimize damage. 909 */ 910 @Override 911 @SuppressWarnings("deprecation") rollback(final Server server, final RegionServerServices services, User user)912 public boolean rollback(final Server server, final RegionServerServices services, User user) 913 throws IOException { 914 // Coprocessor callback 915 if (this.parent.getCoprocessorHost() != null) { 916 if (user == null) { 917 this.parent.getCoprocessorHost().preRollBackSplit(); 918 } else { 919 try { 920 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { 921 @Override 922 public Void run() throws Exception { 923 parent.getCoprocessorHost().preRollBackSplit(); 924 return null; 925 } 926 }); 927 } catch (InterruptedException ie) { 928 InterruptedIOException iioe = new InterruptedIOException(); 929 iioe.initCause(ie); 930 throw iioe; 931 } 932 } 933 } 934 935 boolean result = true; 936 ListIterator<JournalEntry> iterator = 937 this.journal.listIterator(this.journal.size()); 938 // Iterate in reverse. 939 while (iterator.hasPrevious()) { 940 JournalEntry je = iterator.previous(); 941 942 transition(je.getPhase(), true); 943 944 switch(je.getPhase()) { 945 946 case SET_SPLITTING: 947 if (useCoordinatedStateManager(server) && server instanceof HRegionServer) { 948 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 949 .getSplitTransactionCoordination().clean(this.parent.getRegionInfo()); 950 } else if (services != null && !useZKForAssignment 951 && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED, 952 parent.getRegionInfo(), hri_a, hri_b)) { 953 return false; 954 } 955 break; 956 957 case CREATE_SPLIT_DIR: 958 this.parent.writestate.writesEnabled = true; 959 this.parent.getRegionFileSystem().cleanupSplitsDir(); 960 break; 961 962 case CLOSED_PARENT_REGION: 963 try { 964 // So, this returns a seqid but if we just closed and then reopened, we 965 // should be ok. On close, we flushed using sequenceid obtained from 966 // hosting regionserver so no need to propagate the sequenceid returned 967 // out of initialize below up into regionserver as we normally do. 968 // TODO: Verify. 969 this.parent.initialize(); 970 } catch (IOException e) { 971 LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " + 972 this.parent.getRegionInfo().getRegionNameAsString(), e); 973 throw new RuntimeException(e); 974 } 975 break; 976 977 case STARTED_REGION_A_CREATION: 978 this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a); 979 break; 980 981 case STARTED_REGION_B_CREATION: 982 this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b); 983 break; 984 985 case OFFLINED_PARENT: 986 if (services != null) services.addToOnlineRegions(this.parent); 987 break; 988 989 case PONR: 990 // We got to the point-of-no-return so we need to just abort. Return 991 // immediately. Do not clean up created daughter regions. They need 992 // to be in place so we don't delete the parent region mistakenly. 993 // See HBASE-3872. 994 return false; 995 996 // Informational only cases 997 case STARTED: 998 case PREPARED: 999 case BEFORE_PRE_SPLIT_HOOK: 1000 case AFTER_PRE_SPLIT_HOOK: 1001 case BEFORE_POST_SPLIT_HOOK: 1002 case AFTER_POST_SPLIT_HOOK: 1003 case OPENED_REGION_A: 1004 case OPENED_REGION_B: 1005 case COMPLETED: 1006 break; 1007 1008 default: 1009 throw new RuntimeException("Unhandled journal entry: " + je); 1010 } 1011 } 1012 // Coprocessor callback 1013 if (this.parent.getCoprocessorHost() != null) { 1014 if (user == null) { 1015 this.parent.getCoprocessorHost().postRollBackSplit(); 1016 } else { 1017 try { 1018 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { 1019 @Override 1020 public Void run() throws Exception { 1021 parent.getCoprocessorHost().postRollBackSplit(); 1022 return null; 1023 } 1024 }); 1025 } catch (InterruptedException ie) { 1026 InterruptedIOException iioe = new InterruptedIOException(); 1027 iioe.initCause(ie); 1028 throw iioe; 1029 } 1030 } 1031 } 1032 return result; 1033 } 1034 getFirstDaughter()1035 HRegionInfo getFirstDaughter() { 1036 return hri_a; 1037 } 1038 getSecondDaughter()1039 HRegionInfo getSecondDaughter() { 1040 return hri_b; 1041 } 1042 1043 @Override getJournal()1044 public List<JournalEntry> getJournal() { 1045 return journal; 1046 } 1047 1048 @Override registerTransactionListener(TransactionListener listener)1049 public SplitTransaction registerTransactionListener(TransactionListener listener) { 1050 listeners.add(listener); 1051 return this; 1052 } 1053 1054 @Override getServer()1055 public Server getServer() { 1056 return server; 1057 } 1058 1059 @Override getRegionServerServices()1060 public RegionServerServices getRegionServerServices() { 1061 return rsServices; 1062 } 1063 } 1064