1 /** 2 * Copyright The Apache Software Foundation 3 * 4 * Licensed to the Apache Software Foundation (ASF) under one or more 5 * contributor license agreements. See the NOTICE file distributed with this 6 * work for additional information regarding copyright ownership. The ASF 7 * licenses this file to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 16 * License for the specific language governing permissions and limitationsME 17 * under the License. 18 */ 19 package org.apache.hadoop.hbase.regionserver; 20 21 import java.io.IOException; 22 import java.io.InterruptedIOException; 23 import java.security.PrivilegedExceptionAction; 24 import java.util.ArrayList; 25 import java.util.List; 26 import java.util.ListIterator; 27 import java.util.Map; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.hadoop.hbase.classification.InterfaceAudience; 32 import org.apache.hadoop.fs.Path; 33 import org.apache.hadoop.hbase.HConstants; 34 import org.apache.hadoop.hbase.HRegionInfo; 35 import org.apache.hadoop.hbase.MetaMutationAnnotation; 36 import org.apache.hadoop.hbase.Server; 37 import org.apache.hadoop.hbase.ServerName; 38 import org.apache.hadoop.hbase.MetaTableAccessor; 39 import org.apache.hadoop.hbase.client.Delete; 40 import org.apache.hadoop.hbase.client.HConnection; 41 import org.apache.hadoop.hbase.client.Mutation; 42 import org.apache.hadoop.hbase.client.Put; 43 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; 44 import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails; 45 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 46 import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable; 47 import org.apache.hadoop.hbase.security.User; 48 import org.apache.hadoop.hbase.util.Bytes; 49 import org.apache.hadoop.hbase.util.ConfigUtil; 50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 51 import org.apache.hadoop.hbase.util.Pair; 52 import org.apache.zookeeper.KeeperException; 53 54 @InterfaceAudience.Private 55 public class RegionMergeTransactionImpl implements RegionMergeTransaction { 56 private static final Log LOG = LogFactory.getLog(RegionMergeTransactionImpl.class); 57 58 // Merged region info 59 private HRegionInfo mergedRegionInfo; 60 // region_a sorts before region_b 61 private final HRegion region_a; 62 private final HRegion region_b; 63 // merges dir is under region_a 64 private final Path mergesdir; 65 // We only merge adjacent regions if forcible is false 66 private final boolean forcible; 67 private boolean useCoordinationForAssignment; 68 private final long masterSystemTime; 69 70 /* 71 * Transaction state for listener, only valid during execute and 72 * rollback 73 */ 74 private RegionMergeTransactionPhase currentPhase = RegionMergeTransactionPhase.STARTED; 75 private Server server; 76 private RegionServerServices rsServices; 77 78 public static class JournalEntryImpl implements JournalEntry { 79 private RegionMergeTransactionPhase type; 80 private long timestamp; 81 JournalEntryImpl(RegionMergeTransactionPhase type)82 public JournalEntryImpl(RegionMergeTransactionPhase type) { 83 this(type, EnvironmentEdgeManager.currentTime()); 84 } 85 JournalEntryImpl(RegionMergeTransactionPhase type, long timestamp)86 public JournalEntryImpl(RegionMergeTransactionPhase type, long timestamp) { 87 this.type = type; 88 this.timestamp = timestamp; 89 } 90 91 @Override toString()92 public String toString() { 93 StringBuilder sb = new StringBuilder(); 94 sb.append(type); 95 sb.append(" at "); 96 sb.append(timestamp); 97 return sb.toString(); 98 } 99 100 @Override getPhase()101 public RegionMergeTransactionPhase getPhase() { 102 return type; 103 } 104 105 @Override getTimeStamp()106 public long getTimeStamp() { 107 return timestamp; 108 } 109 } 110 111 /* 112 * Journal of how far the merge transaction has progressed. 113 */ 114 private final List<JournalEntry> journal = new ArrayList<JournalEntry>(); 115 116 /* 117 * Listeners 118 */ 119 private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>(); 120 121 private static IOException closedByOtherException = new IOException( 122 "Failed to close region: already closed by another thread"); 123 124 private RegionServerCoprocessorHost rsCoprocessorHost = null; 125 126 private RegionMergeDetails rmd; 127 128 /** 129 * Constructor 130 * @param a region a to merge 131 * @param b region b to merge 132 * @param forcible if false, we will only merge adjacent regions 133 */ RegionMergeTransactionImpl(final Region a, final Region b, final boolean forcible)134 public RegionMergeTransactionImpl(final Region a, final Region b, 135 final boolean forcible) { 136 this(a, b, forcible, EnvironmentEdgeManager.currentTime()); 137 } 138 /** 139 * Constructor 140 * @param a region a to merge 141 * @param b region b to merge 142 * @param forcible if false, we will only merge adjacent regions 143 * @param masterSystemTime the time at the master side 144 */ RegionMergeTransactionImpl(final Region a, final Region b, final boolean forcible, long masterSystemTime)145 public RegionMergeTransactionImpl(final Region a, final Region b, 146 final boolean forcible, long masterSystemTime) { 147 if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) { 148 this.region_a = (HRegion)a; 149 this.region_b = (HRegion)b; 150 } else { 151 this.region_a = (HRegion)b; 152 this.region_b = (HRegion)a; 153 } 154 this.forcible = forcible; 155 this.masterSystemTime = masterSystemTime; 156 this.mergesdir = region_a.getRegionFileSystem().getMergesDir(); 157 } 158 transition(RegionMergeTransactionPhase nextPhase)159 private void transition(RegionMergeTransactionPhase nextPhase) throws IOException { 160 transition(nextPhase, false); 161 } 162 transition(RegionMergeTransactionPhase nextPhase, boolean isRollback)163 private void transition(RegionMergeTransactionPhase nextPhase, boolean isRollback) 164 throws IOException { 165 if (!isRollback) { 166 // Add to the journal first, because if the listener throws an exception 167 // we need to roll back starting at 'nextPhase' 168 this.journal.add(new JournalEntryImpl(nextPhase)); 169 } 170 for (int i = 0; i < listeners.size(); i++) { 171 TransactionListener listener = listeners.get(i); 172 if (!isRollback) { 173 listener.transition(this, currentPhase, nextPhase); 174 } else { 175 listener.rollback(this, currentPhase, nextPhase); 176 } 177 } 178 currentPhase = nextPhase; 179 } 180 181 /** 182 * Does checks on merge inputs. 183 * @param services 184 * @return <code>true</code> if the regions are mergeable else 185 * <code>false</code> if they are not (e.g. its already closed, etc.). 186 */ 187 @Override prepare(final RegionServerServices services)188 public boolean prepare(final RegionServerServices services) throws IOException { 189 if (!region_a.getTableDesc().getTableName() 190 .equals(region_b.getTableDesc().getTableName())) { 191 LOG.info("Can't merge regions " + region_a + "," + region_b 192 + " because they do not belong to the same table"); 193 return false; 194 } 195 if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) { 196 LOG.info("Can't merge the same region " + region_a); 197 return false; 198 } 199 if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(), 200 region_b.getRegionInfo())) { 201 String msg = "Skip merging " + this.region_a.getRegionInfo().getRegionNameAsString() 202 + " and " + this.region_b.getRegionInfo().getRegionNameAsString() 203 + ", because they are not adjacent."; 204 LOG.info(msg); 205 return false; 206 } 207 if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) { 208 return false; 209 } 210 try { 211 boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services, 212 region_a.getRegionInfo().getRegionName()); 213 if (regionAHasMergeQualifier || 214 hasMergeQualifierInMeta(services, region_b.getRegionInfo().getRegionName())) { 215 LOG.debug("Region " + (regionAHasMergeQualifier ? 216 region_a.getRegionInfo().getRegionNameAsString() : 217 region_b.getRegionInfo().getRegionNameAsString()) 218 + " is not mergeable because it has merge qualifier in META"); 219 return false; 220 } 221 } catch (IOException e) { 222 LOG.warn("Failed judging whether merge transaction is available for " 223 + region_a.getRegionInfo().getRegionNameAsString() + " and " 224 + region_b.getRegionInfo().getRegionNameAsString(), e); 225 return false; 226 } 227 228 // WARN: make sure there is no parent region of the two merging regions in 229 // hbase:meta If exists, fixing up daughters would cause daughter regions(we 230 // have merged one) online again when we restart master, so we should clear 231 // the parent region to prevent the above case 232 // Since HBASE-7721, we don't need fix up daughters any more. so here do 233 // nothing 234 235 this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(), 236 region_b.getRegionInfo()); 237 238 transition(RegionMergeTransactionPhase.PREPARED); 239 return true; 240 } 241 242 /** 243 * Run the transaction. 244 * @param server Hosting server instance. Can be null when testing 245 * @param services Used to online/offline regions. 246 * @throws IOException If thrown, transaction failed. Call 247 * {@link #rollback(Server, RegionServerServices)} 248 * @return merged region 249 * @throws IOException 250 * @see #rollback(Server, RegionServerServices) 251 */ 252 @Override execute(final Server server, final RegionServerServices services)253 public HRegion execute(final Server server, 254 final RegionServerServices services) throws IOException { 255 if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) { 256 LOG.warn("Should use execute(Server, RegionServerServices, User)"); 257 } 258 return execute(server, services, null); 259 } 260 261 @Override execute(final Server server, final RegionServerServices services, User user)262 public HRegion execute(final Server server, final RegionServerServices services, User user) 263 throws IOException { 264 this.server = server; 265 this.rsServices = services; 266 useCoordinationForAssignment = 267 server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration()); 268 if (rmd == null) { 269 rmd = server != null && server.getCoordinatedStateManager() != null ? 270 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 271 .getRegionMergeCoordination().getDefaultDetails() 272 : null; 273 } 274 if (rsCoprocessorHost == null) { 275 rsCoprocessorHost = server != null ? 276 ((HRegionServer) server).getRegionServerCoprocessorHost() : null; 277 } 278 final HRegion mergedRegion = createMergedRegion(server, services, user); 279 if (rsCoprocessorHost != null) { 280 if (user == null) { 281 rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion); 282 } else { 283 try { 284 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { 285 @Override 286 public Void run() throws Exception { 287 rsCoprocessorHost.postMergeCommit(region_a, region_b, mergedRegion); 288 return null; 289 } 290 }); 291 } catch (InterruptedException ie) { 292 InterruptedIOException iioe = new InterruptedIOException(); 293 iioe.initCause(ie); 294 throw iioe; 295 } 296 } 297 } 298 stepsAfterPONR(server, services, mergedRegion, user); 299 300 transition(RegionMergeTransactionPhase.COMPLETED); 301 302 return mergedRegion; 303 } 304 305 @Deprecated stepsAfterPONR(final Server server, final RegionServerServices services, final HRegion mergedRegion)306 public void stepsAfterPONR(final Server server, final RegionServerServices services, 307 final HRegion mergedRegion) throws IOException { 308 stepsAfterPONR(server, services, mergedRegion, null); 309 } 310 stepsAfterPONR(final Server server, final RegionServerServices services, final HRegion mergedRegion, User user)311 public void stepsAfterPONR(final Server server, final RegionServerServices services, 312 final HRegion mergedRegion, User user) throws IOException { 313 openMergedRegion(server, services, mergedRegion); 314 if (useCoordination(server)) { 315 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 316 .getRegionMergeCoordination().completeRegionMergeTransaction(services, 317 mergedRegionInfo, region_a, region_b, rmd, mergedRegion); 318 } 319 if (rsCoprocessorHost != null) { 320 if (user == null) { 321 rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion); 322 } else { 323 try { 324 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { 325 @Override 326 public Void run() throws Exception { 327 rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion); 328 return null; 329 } 330 }); 331 } catch (InterruptedException ie) { 332 InterruptedIOException iioe = new InterruptedIOException(); 333 iioe.initCause(ie); 334 throw iioe; 335 } 336 } 337 } 338 } 339 340 /** 341 * Prepare the merged region and region files. 342 * @param server Hosting server instance. Can be null when testing 343 * @param services Used to online/offline regions. 344 * @return merged region 345 * @throws IOException If thrown, transaction failed. Call 346 * {@link #rollback(Server, RegionServerServices)} 347 */ createMergedRegion(final Server server, final RegionServerServices services, User user)348 HRegion createMergedRegion(final Server server, 349 final RegionServerServices services, User user) throws IOException { 350 LOG.info("Starting merge of " + region_a + " and " 351 + region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible); 352 if ((server != null && server.isStopped()) 353 || (services != null && services.isStopping())) { 354 throw new IOException("Server is stopped or stopping"); 355 } 356 357 if (rsCoprocessorHost != null) { 358 boolean ret = false; 359 if (user == null) { 360 ret = rsCoprocessorHost.preMerge(region_a, region_b); 361 } else { 362 try { 363 ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() { 364 @Override 365 public Boolean run() throws Exception { 366 return rsCoprocessorHost.preMerge(region_a, region_b); 367 } 368 }); 369 } catch (InterruptedException ie) { 370 InterruptedIOException iioe = new InterruptedIOException(); 371 iioe.initCause(ie); 372 throw iioe; 373 } 374 } 375 if (ret) { 376 throw new IOException("Coprocessor bypassing regions " + this.region_a + " " 377 + this.region_b + " merge."); 378 } 379 } 380 381 // If true, no cluster to write meta edits to or to use coordination. 382 boolean testing = server == null ? true : server.getConfiguration() 383 .getBoolean("hbase.testing.nocluster", false); 384 385 HRegion mergedRegion = stepsBeforePONR(server, services, testing); 386 387 @MetaMutationAnnotation 388 final List<Mutation> metaEntries = new ArrayList<Mutation>(); 389 if (rsCoprocessorHost != null) { 390 boolean ret = false; 391 if (user == null) { 392 ret = rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries); 393 } else { 394 try { 395 ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() { 396 @Override 397 public Boolean run() throws Exception { 398 return rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries); 399 } 400 }); 401 } catch (InterruptedException ie) { 402 InterruptedIOException iioe = new InterruptedIOException(); 403 iioe.initCause(ie); 404 throw iioe; 405 } 406 } 407 408 if (ret) { 409 throw new IOException("Coprocessor bypassing regions " + this.region_a + " " 410 + this.region_b + " merge."); 411 } 412 try { 413 for (Mutation p : metaEntries) { 414 HRegionInfo.parseRegionName(p.getRow()); 415 } 416 } catch (IOException e) { 417 LOG.error("Row key of mutation from coprocessor is not parsable as region name." 418 + "Mutations from coprocessor should only be for hbase:meta table.", e); 419 throw e; 420 } 421 } 422 423 // This is the point of no return. Similar with SplitTransaction. 424 // IF we reach the PONR then subsequent failures need to crash out this 425 // regionserver 426 transition(RegionMergeTransactionPhase.PONR); 427 428 // Add merged region and delete region_a and region_b 429 // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region 430 // will determine whether the region is merged or not in case of failures. 431 // If it is successful, master will roll-forward, if not, master will 432 // rollback 433 if (!testing && useCoordinationForAssignment) { 434 if (metaEntries.isEmpty()) { 435 MetaTableAccessor.mergeRegions(server.getConnection(), 436 mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), 437 server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime); 438 } else { 439 mergeRegionsAndPutMetaEntries(server.getConnection(), 440 mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), 441 server.getServerName(), metaEntries, region_a.getTableDesc().getRegionReplication()); 442 } 443 } else if (services != null && !useCoordinationForAssignment) { 444 if (!services.reportRegionStateTransition(TransitionCode.MERGE_PONR, 445 mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { 446 // Passed PONR, let SSH clean it up 447 throw new IOException("Failed to notify master that merge passed PONR: " 448 + region_a.getRegionInfo().getRegionNameAsString() + " and " 449 + region_b.getRegionInfo().getRegionNameAsString()); 450 } 451 } 452 return mergedRegion; 453 } 454 mergeRegionsAndPutMetaEntries(HConnection hConnection, HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, ServerName serverName, List<Mutation> metaEntries, int regionReplication)455 private void mergeRegionsAndPutMetaEntries(HConnection hConnection, 456 HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, 457 ServerName serverName, List<Mutation> metaEntries, 458 int regionReplication) throws IOException { 459 prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries, 460 regionReplication); 461 MetaTableAccessor.mutateMetaTable(hConnection, metaEntries); 462 } 463 prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, ServerName serverName, List<Mutation> mutations, int regionReplication)464 public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA, 465 HRegionInfo regionB, ServerName serverName, List<Mutation> mutations, 466 int regionReplication) throws IOException { 467 HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); 468 469 // use the maximum of what master passed us vs local time. 470 long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime); 471 472 // Put for parent 473 Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged, time); 474 putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, 475 regionA.toByteArray()); 476 putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, 477 regionB.toByteArray()); 478 mutations.add(putOfMerged); 479 // Deletes for merging regions 480 Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA, time); 481 Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB, time); 482 mutations.add(deleteA); 483 mutations.add(deleteB); 484 485 // Add empty locations for region replicas of the merged region so that number of replicas 486 // can be cached whenever the primary region is looked up from meta 487 for (int i = 1; i < regionReplication; i++) { 488 addEmptyLocation(putOfMerged, i); 489 } 490 491 // The merged is a new region, openSeqNum = 1 is fine. 492 addLocation(putOfMerged, serverName, 1); 493 } 494 addLocation(final Put p, final ServerName sn, long openSeqNum)495 public Put addLocation(final Put p, final ServerName sn, long openSeqNum) { 496 p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes 497 .toBytes(sn.getHostAndPort())); 498 p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn 499 .getStartcode())); 500 p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum)); 501 return p; 502 } 503 addEmptyLocation(final Put p, int replicaId)504 private static Put addEmptyLocation(final Put p, int replicaId) { 505 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null); 506 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId), 507 null); 508 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null); 509 return p; 510 } 511 stepsBeforePONR(final Server server, final RegionServerServices services, boolean testing)512 public HRegion stepsBeforePONR(final Server server, final RegionServerServices services, 513 boolean testing) throws IOException { 514 if (rmd == null) { 515 rmd = server != null && server.getCoordinatedStateManager() != null ? 516 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 517 .getRegionMergeCoordination().getDefaultDetails() 518 : null; 519 } 520 521 // If server doesn't have a coordination state manager, don't do coordination actions. 522 if (useCoordination(server)) { 523 try { 524 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 525 .getRegionMergeCoordination().startRegionMergeTransaction(mergedRegionInfo, 526 server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo()); 527 } catch (IOException e) { 528 throw new IOException("Failed to start region merge transaction for " 529 + this.mergedRegionInfo.getRegionNameAsString(), e); 530 } 531 } else if (services != null && !useCoordinationForAssignment) { 532 if (!services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE, 533 mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { 534 throw new IOException("Failed to get ok from master to merge " 535 + region_a.getRegionInfo().getRegionNameAsString() + " and " 536 + region_b.getRegionInfo().getRegionNameAsString()); 537 } 538 } 539 transition(RegionMergeTransactionPhase.SET_MERGING); 540 if (useCoordination(server)) { 541 // After creating the merge node, wait for master to transition it 542 // from PENDING_MERGE to MERGING so that we can move on. We want master 543 // knows about it and won't transition any region which is merging. 544 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 545 .getRegionMergeCoordination().waitForRegionMergeTransaction(services, 546 mergedRegionInfo, region_a, region_b, rmd); 547 } 548 549 this.region_a.getRegionFileSystem().createMergesDir(); 550 transition(RegionMergeTransactionPhase.CREATED_MERGE_DIR); 551 552 Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion( 553 services, this.region_a, true, testing); 554 Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion( 555 services, this.region_b, false, testing); 556 557 assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null; 558 559 560 // 561 // mergeStoreFiles creates merged region dirs under the region_a merges dir 562 // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will 563 // clean this up. 564 mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB); 565 566 if (useCoordination(server)) { 567 try { 568 // Do the final check in case any merging region is moved somehow. If so, the transition 569 // will fail. 570 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 571 .getRegionMergeCoordination().confirmRegionMergeTransaction(this.mergedRegionInfo, 572 region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), rmd); 573 } catch (IOException e) { 574 throw new IOException("Failed setting MERGING on " 575 + this.mergedRegionInfo.getRegionNameAsString(), e); 576 } 577 } 578 579 // Log to the journal that we are creating merged region. We could fail 580 // halfway through. If we do, we could have left 581 // stuff in fs that needs cleanup -- a storefile or two. Thats why we 582 // add entry to journal BEFORE rather than AFTER the change. 583 transition(RegionMergeTransactionPhase.STARTED_MERGED_REGION_CREATION); 584 HRegion mergedRegion = createMergedRegionFromMerges(this.region_a, 585 this.region_b, this.mergedRegionInfo); 586 return mergedRegion; 587 } 588 589 /** 590 * Create a merged region from the merges directory under region a. In order 591 * to mock it for tests, place it with a new method. 592 * @param a hri of region a 593 * @param b hri of region b 594 * @param mergedRegion hri of merged region 595 * @return merged HRegion. 596 * @throws IOException 597 */ createMergedRegionFromMerges(final HRegion a, final HRegion b, final HRegionInfo mergedRegion)598 HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b, 599 final HRegionInfo mergedRegion) throws IOException { 600 return a.createMergedRegionFromMerges(mergedRegion, b); 601 } 602 603 /** 604 * Close the merging region and offline it in regionserver 605 * @param services 606 * @param region 607 * @param isRegionA true if it is merging region a, false if it is region b 608 * @param testing true if it is testing 609 * @return a map of family name to list of store files 610 * @throws IOException 611 */ closeAndOfflineRegion( final RegionServerServices services, final HRegion region, final boolean isRegionA, final boolean testing)612 private Map<byte[], List<StoreFile>> closeAndOfflineRegion( 613 final RegionServerServices services, final HRegion region, 614 final boolean isRegionA, final boolean testing) throws IOException { 615 Map<byte[], List<StoreFile>> hstoreFilesToMerge = null; 616 Exception exceptionToThrow = null; 617 try { 618 hstoreFilesToMerge = region.close(false); 619 } catch (Exception e) { 620 exceptionToThrow = e; 621 } 622 if (exceptionToThrow == null && hstoreFilesToMerge == null) { 623 // The region was closed by a concurrent thread. We can't continue 624 // with the merge, instead we must just abandon the merge. If we 625 // reopen or merge this could cause problems because the region has 626 // probably already been moved to a different server, or is in the 627 // process of moving to a different server. 628 exceptionToThrow = closedByOtherException; 629 } 630 if (exceptionToThrow != closedByOtherException) { 631 transition(isRegionA ? RegionMergeTransactionPhase.CLOSED_REGION_A 632 : RegionMergeTransactionPhase.CLOSED_REGION_B); 633 } 634 if (exceptionToThrow != null) { 635 if (exceptionToThrow instanceof IOException) 636 throw (IOException) exceptionToThrow; 637 throw new IOException(exceptionToThrow); 638 } 639 640 if (!testing) { 641 services.removeFromOnlineRegions(region, null); 642 } 643 transition(isRegionA ? RegionMergeTransactionPhase.OFFLINED_REGION_A 644 : RegionMergeTransactionPhase.OFFLINED_REGION_B); 645 return hstoreFilesToMerge; 646 } 647 648 /** 649 * Get merged region info through the specified two regions 650 * @param a merging region A 651 * @param b merging region B 652 * @return the merged region info 653 */ getMergedRegionInfo(final HRegionInfo a, final HRegionInfo b)654 public static HRegionInfo getMergedRegionInfo(final HRegionInfo a, 655 final HRegionInfo b) { 656 long rid = EnvironmentEdgeManager.currentTime(); 657 // Regionid is timestamp. Merged region's id can't be less than that of 658 // merging regions else will insert at wrong location in hbase:meta 659 if (rid < a.getRegionId() || rid < b.getRegionId()) { 660 LOG.warn("Clock skew; merging regions id are " + a.getRegionId() 661 + " and " + b.getRegionId() + ", but current time here is " + rid); 662 rid = Math.max(a.getRegionId(), b.getRegionId()) + 1; 663 } 664 665 byte[] startKey = null; 666 byte[] endKey = null; 667 // Choose the smaller as start key 668 if (a.compareTo(b) <= 0) { 669 startKey = a.getStartKey(); 670 } else { 671 startKey = b.getStartKey(); 672 } 673 // Choose the bigger as end key 674 if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY) 675 || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY) 676 && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) { 677 endKey = a.getEndKey(); 678 } else { 679 endKey = b.getEndKey(); 680 } 681 682 // Merged region is sorted between two merging regions in META 683 HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey, 684 endKey, false, rid); 685 return mergedRegionInfo; 686 } 687 688 /** 689 * Perform time consuming opening of the merged region. 690 * @param server Hosting server instance. Can be null when testing 691 * @param services Used to online/offline regions. 692 * @param merged the merged region 693 * @throws IOException If thrown, transaction failed. Call 694 * {@link #rollback(Server, RegionServerServices)} 695 */ openMergedRegion(final Server server, final RegionServerServices services, HRegion merged)696 void openMergedRegion(final Server server, 697 final RegionServerServices services, HRegion merged) throws IOException { 698 boolean stopped = server != null && server.isStopped(); 699 boolean stopping = services != null && services.isStopping(); 700 if (stopped || stopping) { 701 LOG.info("Not opening merged region " + merged.getRegionInfo().getRegionNameAsString() 702 + " because stopping=" + stopping + ", stopped=" + stopped); 703 return; 704 } 705 HRegionInfo hri = merged.getRegionInfo(); 706 LoggingProgressable reporter = server == null ? null 707 : new LoggingProgressable(hri, server.getConfiguration().getLong( 708 "hbase.regionserver.regionmerge.open.log.interval", 10000)); 709 merged.openHRegion(reporter); 710 711 if (services != null) { 712 try { 713 if (useCoordinationForAssignment) { 714 services.postOpenDeployTasks(merged); 715 } else if (!services.reportRegionStateTransition(TransitionCode.MERGED, 716 mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { 717 throw new IOException("Failed to report merged region to master: " 718 + mergedRegionInfo.getShortNameToLog()); 719 } 720 services.addToOnlineRegions(merged); 721 } catch (KeeperException ke) { 722 throw new IOException(ke); 723 } 724 } 725 726 } 727 728 /** 729 * Create reference file(s) of merging regions under the region_a merges dir 730 * @param hstoreFilesOfRegionA 731 * @param hstoreFilesOfRegionB 732 * @throws IOException 733 */ mergeStoreFiles( Map<byte[], List<StoreFile>> hstoreFilesOfRegionA, Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)734 private void mergeStoreFiles( 735 Map<byte[], List<StoreFile>> hstoreFilesOfRegionA, 736 Map<byte[], List<StoreFile>> hstoreFilesOfRegionB) 737 throws IOException { 738 // Create reference file(s) of region A in mergdir 739 HRegionFileSystem fs_a = this.region_a.getRegionFileSystem(); 740 for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA 741 .entrySet()) { 742 String familyName = Bytes.toString(entry.getKey()); 743 for (StoreFile storeFile : entry.getValue()) { 744 fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile, 745 this.mergesdir); 746 } 747 } 748 // Create reference file(s) of region B in mergedir 749 HRegionFileSystem fs_b = this.region_b.getRegionFileSystem(); 750 for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB 751 .entrySet()) { 752 String familyName = Bytes.toString(entry.getKey()); 753 for (StoreFile storeFile : entry.getValue()) { 754 fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile, 755 this.mergesdir); 756 } 757 } 758 } 759 760 /** 761 * @param server Hosting server instance (May be null when testing). 762 * @param services Services of regionserver, used to online regions. 763 * @throws IOException If thrown, rollback failed. Take drastic action. 764 * @return True if we successfully rolled back, false if we got to the point 765 * of no return and so now need to abort the server to minimize 766 * damage. 767 */ 768 @Override 769 @SuppressWarnings("deprecation") rollback(final Server server, final RegionServerServices services)770 public boolean rollback(final Server server, 771 final RegionServerServices services) throws IOException { 772 if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) { 773 LOG.warn("Should use execute(Server, RegionServerServices, User)"); 774 } 775 return rollback(server, services, null); 776 } 777 778 @Override rollback(final Server server, final RegionServerServices services, User user)779 public boolean rollback(final Server server, 780 final RegionServerServices services, User user) throws IOException { 781 assert this.mergedRegionInfo != null; 782 // Coprocessor callback 783 if (rsCoprocessorHost != null) { 784 if (user == null) { 785 rsCoprocessorHost.preRollBackMerge(region_a, region_b); 786 } else { 787 try { 788 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { 789 @Override 790 public Void run() throws Exception { 791 rsCoprocessorHost.preRollBackMerge(region_a, region_b); 792 return null; 793 } 794 }); 795 } catch (InterruptedException ie) { 796 InterruptedIOException iioe = new InterruptedIOException(); 797 iioe.initCause(ie); 798 throw iioe; 799 } 800 } 801 } 802 803 boolean result = true; 804 ListIterator<JournalEntry> iterator = this.journal 805 .listIterator(this.journal.size()); 806 // Iterate in reverse. 807 while (iterator.hasPrevious()) { 808 JournalEntry je = iterator.previous(); 809 810 transition(je.getPhase(), true); 811 812 switch (je.getPhase()) { 813 814 case SET_MERGING: 815 if (useCoordination(server)) { 816 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 817 .getRegionMergeCoordination().clean(this.mergedRegionInfo); 818 } else if (services != null && !useCoordinationForAssignment 819 && !services.reportRegionStateTransition(TransitionCode.MERGE_REVERTED, 820 mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { 821 return false; 822 } 823 break; 824 825 case CREATED_MERGE_DIR: 826 this.region_a.writestate.writesEnabled = true; 827 this.region_b.writestate.writesEnabled = true; 828 this.region_a.getRegionFileSystem().cleanupMergesDir(); 829 break; 830 831 case CLOSED_REGION_A: 832 try { 833 // So, this returns a seqid but if we just closed and then reopened, 834 // we should be ok. On close, we flushed using sequenceid obtained 835 // from hosting regionserver so no need to propagate the sequenceid 836 // returned out of initialize below up into regionserver as we 837 // normally do. 838 this.region_a.initialize(); 839 } catch (IOException e) { 840 LOG.error("Failed rollbacking CLOSED_REGION_A of region " 841 + this.region_a.getRegionInfo().getRegionNameAsString(), e); 842 throw new RuntimeException(e); 843 } 844 break; 845 846 case OFFLINED_REGION_A: 847 if (services != null) 848 services.addToOnlineRegions(this.region_a); 849 break; 850 851 case CLOSED_REGION_B: 852 try { 853 this.region_b.initialize(); 854 } catch (IOException e) { 855 LOG.error("Failed rollbacking CLOSED_REGION_A of region " 856 + this.region_b.getRegionInfo().getRegionNameAsString(), e); 857 throw new RuntimeException(e); 858 } 859 break; 860 861 case OFFLINED_REGION_B: 862 if (services != null) 863 services.addToOnlineRegions(this.region_b); 864 break; 865 866 case STARTED_MERGED_REGION_CREATION: 867 this.region_a.getRegionFileSystem().cleanupMergedRegion( 868 this.mergedRegionInfo); 869 break; 870 871 case PONR: 872 // We got to the point-of-no-return so we need to just abort. Return 873 // immediately. Do not clean up created merged regions. 874 return false; 875 876 // Informational only cases 877 case STARTED: 878 case PREPARED: 879 case COMPLETED: 880 break; 881 882 default: 883 throw new RuntimeException("Unhandled journal entry: " + je); 884 } 885 } 886 // Coprocessor callback 887 if (rsCoprocessorHost != null) { 888 if (user == null) { 889 rsCoprocessorHost.postRollBackMerge(region_a, region_b); 890 } else { 891 try { 892 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { 893 @Override 894 public Void run() throws Exception { 895 rsCoprocessorHost.postRollBackMerge(region_a, region_b); 896 return null; 897 } 898 }); 899 } catch (InterruptedException ie) { 900 InterruptedIOException iioe = new InterruptedIOException(); 901 iioe.initCause(ie); 902 throw iioe; 903 } 904 } 905 } 906 907 return result; 908 } 909 910 @Override getMergedRegionInfo()911 public HRegionInfo getMergedRegionInfo() { 912 return this.mergedRegionInfo; 913 } 914 915 // For unit testing. getMergesDir()916 Path getMergesDir() { 917 return this.mergesdir; 918 } 919 useCoordination(final Server server)920 private boolean useCoordination(final Server server) { 921 return server != null && useCoordinationForAssignment 922 && server.getCoordinatedStateManager() != null; 923 } 924 925 926 927 /** 928 * Checks if the given region has merge qualifier in hbase:meta 929 * @param services 930 * @param regionName name of specified region 931 * @return true if the given region has merge qualifier in META.(It will be 932 * cleaned by CatalogJanitor) 933 * @throws IOException 934 */ hasMergeQualifierInMeta(final RegionServerServices services, final byte[] regionName)935 boolean hasMergeQualifierInMeta(final RegionServerServices services, 936 final byte[] regionName) throws IOException { 937 if (services == null) return false; 938 // Get merge regions if it is a merged region and already has merge 939 // qualifier 940 Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaTableAccessor 941 .getRegionsFromMergeQualifier(services.getConnection(), regionName); 942 if (mergeRegions != null && 943 (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) { 944 // It has merge qualifier 945 return true; 946 } 947 return false; 948 } 949 950 @Override getJournal()951 public List<JournalEntry> getJournal() { 952 return journal; 953 } 954 955 @Override registerTransactionListener(TransactionListener listener)956 public RegionMergeTransaction registerTransactionListener(TransactionListener listener) { 957 listeners.add(listener); 958 return this; 959 } 960 961 @Override getServer()962 public Server getServer() { 963 return server; 964 } 965 966 @Override getRegionServerServices()967 public RegionServerServices getRegionServerServices() { 968 return rsServices; 969 } 970 971 } 972