1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.regionserver; 20 21 import java.io.IOException; 22 import java.io.InterruptedIOException; 23 import java.net.BindException; 24 import java.net.InetSocketAddress; 25 import java.net.UnknownHostException; 26 import java.util.ArrayList; 27 import java.util.Collections; 28 import java.util.HashMap; 29 import java.util.Iterator; 30 import java.util.List; 31 import java.util.Map; 32 import java.util.Map.Entry; 33 import java.util.NavigableMap; 34 import java.util.Set; 35 import java.util.TreeSet; 36 import java.util.concurrent.ConcurrentHashMap; 37 import java.util.concurrent.atomic.AtomicLong; 38 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 import org.apache.hadoop.conf.Configuration; 42 import org.apache.hadoop.hbase.Cell; 43 import org.apache.hadoop.hbase.CellScannable; 44 import org.apache.hadoop.hbase.CellScanner; 45 import org.apache.hadoop.hbase.CellUtil; 46 import org.apache.hadoop.hbase.DoNotRetryIOException; 47 import org.apache.hadoop.hbase.DroppedSnapshotException; 48 import org.apache.hadoop.hbase.HBaseIOException; 49 import org.apache.hadoop.hbase.HConstants; 50 import org.apache.hadoop.hbase.HRegionInfo; 51 import org.apache.hadoop.hbase.HTableDescriptor; 52 import org.apache.hadoop.hbase.MetaTableAccessor; 53 import org.apache.hadoop.hbase.MultiActionResultTooLarge; 54 import org.apache.hadoop.hbase.NotServingRegionException; 55 import org.apache.hadoop.hbase.ServerName; 56 import org.apache.hadoop.hbase.TableName; 57 import org.apache.hadoop.hbase.UnknownScannerException; 58 import org.apache.hadoop.hbase.classification.InterfaceAudience; 59 import org.apache.hadoop.hbase.client.Append; 60 import org.apache.hadoop.hbase.client.ConnectionUtils; 61 import org.apache.hadoop.hbase.client.Delete; 62 import org.apache.hadoop.hbase.client.Durability; 63 import org.apache.hadoop.hbase.client.Get; 64 import org.apache.hadoop.hbase.client.Increment; 65 import org.apache.hadoop.hbase.client.Mutation; 66 import org.apache.hadoop.hbase.client.Put; 67 import org.apache.hadoop.hbase.client.RegionReplicaUtil; 68 import org.apache.hadoop.hbase.client.Result; 69 import org.apache.hadoop.hbase.client.RowMutations; 70 import org.apache.hadoop.hbase.client.Scan; 71 import org.apache.hadoop.hbase.conf.ConfigurationObserver; 72 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination; 73 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; 74 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 75 import org.apache.hadoop.hbase.exceptions.MergeRegionException; 76 import org.apache.hadoop.hbase.exceptions.OperationConflictException; 77 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 78 import org.apache.hadoop.hbase.filter.ByteArrayComparable; 79 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 80 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; 81 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; 82 import org.apache.hadoop.hbase.ipc.PriorityFunction; 83 import org.apache.hadoop.hbase.ipc.QosPriority; 84 import org.apache.hadoop.hbase.ipc.RpcCallContext; 85 import org.apache.hadoop.hbase.ipc.RpcServer; 86 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 87 import org.apache.hadoop.hbase.ipc.RpcServerInterface; 88 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 89 import org.apache.hadoop.hbase.ipc.ServerRpcController; 90 import org.apache.hadoop.hbase.master.MasterRpcServices; 91 import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 92 import org.apache.hadoop.hbase.protobuf.RequestConverter; 93 import org.apache.hadoop.hbase.protobuf.ResponseConverter; 94 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; 95 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; 96 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse; 97 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; 98 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse; 99 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; 100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse; 101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest; 104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse; 105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest; 106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse; 107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest; 108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse; 109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest; 110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse; 111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; 112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; 114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest; 118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse; 119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest; 120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse; 121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest; 122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse; 123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; 128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; 129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse; 130 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; 131 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action; 132 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; 136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition; 137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; 140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; 141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; 142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse; 143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; 144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; 145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; 146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; 147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; 148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult; 149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; 150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; 151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; 152 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair; 153 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo; 154 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; 155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 156 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics; 157 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; 158 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; 159 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; 160 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; 161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; 162 import org.apache.hadoop.hbase.quotas.OperationQuota; 163 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; 164 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; 165 import org.apache.hadoop.hbase.regionserver.Region.FlushResult; 166 import org.apache.hadoop.hbase.regionserver.Region.Operation; 167 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 168 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 169 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 170 import org.apache.hadoop.hbase.wal.WAL; 171 import org.apache.hadoop.hbase.regionserver.wal.WALEdit; 172 import org.apache.hadoop.hbase.security.User; 173 import org.apache.hadoop.hbase.util.Bytes; 174 import org.apache.hadoop.hbase.util.Counter; 175 import org.apache.hadoop.hbase.util.DNS; 176 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 177 import org.apache.hadoop.hbase.util.Pair; 178 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 179 import org.apache.hadoop.hbase.util.Strings; 180 import org.apache.hadoop.hbase.wal.WALKey; 181 import org.apache.hadoop.hbase.wal.WALSplitter; 182 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 183 import org.apache.zookeeper.KeeperException; 184 185 import com.google.common.annotations.VisibleForTesting; 186 import com.google.protobuf.ByteString; 187 import com.google.protobuf.Message; 188 import com.google.protobuf.RpcController; 189 import com.google.protobuf.ServiceException; 190 import com.google.protobuf.TextFormat; 191 192 /** 193 * Implements the regionserver RPC services. 194 */ 195 @InterfaceAudience.Private 196 @SuppressWarnings("deprecation") 197 public class RSRpcServices implements HBaseRPCErrorHandler, 198 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, 199 ConfigurationObserver { 200 protected static final Log LOG = LogFactory.getLog(RSRpcServices.class); 201 202 /** RPC scheduler to use for the region server. */ 203 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 204 "hbase.region.server.rpc.scheduler.factory.class"; 205 206 /** 207 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 208 * configuration exists to prevent the scenario where a time limit is specified to be so 209 * restrictive that the time limit is reached immediately (before any cells are scanned). 210 */ 211 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 212 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 213 /** 214 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 215 */ 216 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 217 218 // Request counter. (Includes requests that are not serviced by regions.) 219 final Counter requestCount = new Counter(); 220 // Server to handle client requests. 221 final RpcServerInterface rpcServer; 222 final InetSocketAddress isa; 223 224 private final HRegionServer regionServer; 225 private final long maxScannerResultSize; 226 227 // The reference to the priority extraction function 228 private final PriorityFunction priority; 229 230 private final AtomicLong scannerIdGen = new AtomicLong(0L); 231 private final ConcurrentHashMap<String, RegionScannerHolder> scanners = 232 new ConcurrentHashMap<String, RegionScannerHolder>(); 233 234 /** 235 * The lease timeout period for client scanners (milliseconds). 236 */ 237 private final int scannerLeaseTimeoutPeriod; 238 239 /** 240 * The RPC timeout period (milliseconds) 241 */ 242 private final int rpcTimeout; 243 244 /** 245 * The minimum allowable delta to use for the scan limit 246 */ 247 private final long minimumScanTimeLimitDelta; 248 249 /** 250 * Holder class which holds the RegionScanner and nextCallSeq together. 251 */ 252 private static class RegionScannerHolder { 253 private AtomicLong nextCallSeq = new AtomicLong(0); 254 private RegionScanner s; 255 private Region r; 256 RegionScannerHolder(RegionScanner s, Region r)257 public RegionScannerHolder(RegionScanner s, Region r) { 258 this.s = s; 259 this.r = r; 260 } 261 getNextCallSeq()262 private long getNextCallSeq() { 263 return nextCallSeq.get(); 264 } 265 incNextCallSeq()266 private void incNextCallSeq() { 267 nextCallSeq.incrementAndGet(); 268 } 269 rollbackNextCallSeq()270 private void rollbackNextCallSeq() { 271 nextCallSeq.decrementAndGet(); 272 } 273 } 274 275 /** 276 * Instantiated as a scanner lease. If the lease times out, the scanner is 277 * closed 278 */ 279 private class ScannerListener implements LeaseListener { 280 private final String scannerName; 281 ScannerListener(final String n)282 ScannerListener(final String n) { 283 this.scannerName = n; 284 } 285 286 @Override leaseExpired()287 public void leaseExpired() { 288 RegionScannerHolder rsh = scanners.remove(this.scannerName); 289 if (rsh != null) { 290 RegionScanner s = rsh.s; 291 LOG.info("Scanner " + this.scannerName + " lease expired on region " 292 + s.getRegionInfo().getRegionNameAsString()); 293 try { 294 Region region = regionServer.getRegion(s.getRegionInfo().getRegionName()); 295 if (region != null && region.getCoprocessorHost() != null) { 296 region.getCoprocessorHost().preScannerClose(s); 297 } 298 299 s.close(); 300 if (region != null && region.getCoprocessorHost() != null) { 301 region.getCoprocessorHost().postScannerClose(s); 302 } 303 } catch (IOException e) { 304 LOG.error("Closing scanner for " 305 + s.getRegionInfo().getRegionNameAsString(), e); 306 } 307 } else { 308 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" + 309 " scanner found, hence no chance to close that related scanner!"); 310 } 311 } 312 } 313 getResultOrException( final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats)314 private static ResultOrException getResultOrException( 315 final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) { 316 return getResultOrException(ResponseConverter.buildActionResult(r, stats), index); 317 } 318 getResultOrException(final Exception e, final int index)319 private static ResultOrException getResultOrException(final Exception e, final int index) { 320 return getResultOrException(ResponseConverter.buildActionResult(e), index); 321 } 322 getResultOrException( final ResultOrException.Builder builder, final int index)323 private static ResultOrException getResultOrException( 324 final ResultOrException.Builder builder, final int index) { 325 return builder.setIndex(index).build(); 326 } 327 328 /** 329 * Starts the nonce operation for a mutation, if needed. 330 * @param mutation Mutation. 331 * @param nonceGroup Nonce group from the request. 332 * @returns Nonce used (can be NO_NONCE). 333 */ startNonceOperation(final MutationProto mutation, long nonceGroup)334 private long startNonceOperation(final MutationProto mutation, long nonceGroup) 335 throws IOException, OperationConflictException { 336 if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE; 337 boolean canProceed = false; 338 try { 339 canProceed = regionServer.nonceManager.startOperation( 340 nonceGroup, mutation.getNonce(), regionServer); 341 } catch (InterruptedException ex) { 342 throw new InterruptedIOException("Nonce start operation interrupted"); 343 } 344 if (!canProceed) { 345 // TODO: instead, we could convert append/increment to get w/mvcc 346 String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce() 347 + "} on row [" + Bytes.toString(mutation.getRow().toByteArray()) 348 + "] may have already completed"; 349 throw new OperationConflictException(message); 350 } 351 return mutation.getNonce(); 352 } 353 354 /** 355 * Ends nonce operation for a mutation, if needed. 356 * @param mutation Mutation. 357 * @param nonceGroup Nonce group from the request. Always 0 in initial implementation. 358 * @param success Whether the operation for this nonce has succeeded. 359 */ endNonceOperation(final MutationProto mutation, long nonceGroup, boolean success)360 private void endNonceOperation(final MutationProto mutation, 361 long nonceGroup, boolean success) { 362 if (regionServer.nonceManager != null && mutation.hasNonce()) { 363 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success); 364 } 365 } 366 367 /** 368 * @return True if current call supports cellblocks 369 */ isClientCellBlockSupport()370 private boolean isClientCellBlockSupport() { 371 RpcCallContext context = RpcServer.getCurrentCall(); 372 return context != null && context.isClientCellBlockSupported(); 373 } 374 addResult(final MutateResponse.Builder builder, final Result result, final PayloadCarryingRpcController rpcc)375 private void addResult(final MutateResponse.Builder builder, 376 final Result result, final PayloadCarryingRpcController rpcc) { 377 if (result == null) return; 378 if (isClientCellBlockSupport()) { 379 builder.setResult(ProtobufUtil.toResultNoData(result)); 380 rpcc.setCellScanner(result.cellScanner()); 381 } else { 382 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 383 builder.setResult(pbr); 384 } 385 } 386 addResults(final ScanResponse.Builder builder, final List<Result> results, final RpcController controller, boolean isDefaultRegion)387 private void addResults(final ScanResponse.Builder builder, final List<Result> results, 388 final RpcController controller, boolean isDefaultRegion) { 389 builder.setStale(!isDefaultRegion); 390 if (results == null || results.isEmpty()) return; 391 if (isClientCellBlockSupport()) { 392 for (Result res : results) { 393 builder.addCellsPerResult(res.size()); 394 builder.addPartialFlagPerResult(res.isPartial()); 395 } 396 ((PayloadCarryingRpcController)controller). 397 setCellScanner(CellUtil.createCellScanner(results)); 398 } else { 399 for (Result res: results) { 400 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 401 builder.addResults(pbr); 402 } 403 } 404 } 405 406 /** 407 * Mutate a list of rows atomically. 408 * 409 * @param region 410 * @param actions 411 * @param cellScanner if non-null, the mutation data -- the Cell content. 412 * @throws IOException 413 */ mutateRows(final Region region, final List<ClientProtos.Action> actions, final CellScanner cellScanner)414 private ClientProtos.RegionLoadStats mutateRows(final Region region, 415 final List<ClientProtos.Action> actions, 416 final CellScanner cellScanner) throws IOException { 417 if (!region.getRegionInfo().isMetaTable()) { 418 regionServer.cacheFlusher.reclaimMemStoreMemory(); 419 } 420 RowMutations rm = null; 421 for (ClientProtos.Action action: actions) { 422 if (action.hasGet()) { 423 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 424 action.getGet()); 425 } 426 MutationType type = action.getMutation().getMutateType(); 427 if (rm == null) { 428 rm = new RowMutations(action.getMutation().getRow().toByteArray()); 429 } 430 switch (type) { 431 case PUT: 432 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); 433 break; 434 case DELETE: 435 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); 436 break; 437 default: 438 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); 439 } 440 } 441 region.mutateRow(rm); 442 return ((HRegion)region).getRegionStats(); 443 } 444 445 /** 446 * Mutate a list of rows atomically. 447 * 448 * @param region 449 * @param actions 450 * @param cellScanner if non-null, the mutation data -- the Cell content. 451 * @param row 452 * @param family 453 * @param qualifier 454 * @param compareOp 455 * @param comparator @throws IOException 456 */ checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator)457 private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions, 458 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, 459 CompareOp compareOp, ByteArrayComparable comparator) throws IOException { 460 if (!region.getRegionInfo().isMetaTable()) { 461 regionServer.cacheFlusher.reclaimMemStoreMemory(); 462 } 463 RowMutations rm = null; 464 for (ClientProtos.Action action: actions) { 465 if (action.hasGet()) { 466 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 467 action.getGet()); 468 } 469 MutationType type = action.getMutation().getMutateType(); 470 if (rm == null) { 471 rm = new RowMutations(action.getMutation().getRow().toByteArray()); 472 } 473 switch (type) { 474 case PUT: 475 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); 476 break; 477 case DELETE: 478 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); 479 break; 480 default: 481 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); 482 } 483 } 484 return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE); 485 } 486 487 /** 488 * Execute an append mutation. 489 * 490 * @param region 491 * @param m 492 * @param cellScanner 493 * @return result to return to client if default operation should be 494 * bypassed as indicated by RegionObserver, null otherwise 495 * @throws IOException 496 */ append(final Region region, final OperationQuota quota, final MutationProto m, final CellScanner cellScanner, long nonceGroup)497 private Result append(final Region region, final OperationQuota quota, final MutationProto m, 498 final CellScanner cellScanner, long nonceGroup) throws IOException { 499 long before = EnvironmentEdgeManager.currentTime(); 500 Append append = ProtobufUtil.toAppend(m, cellScanner); 501 quota.addMutation(append); 502 Result r = null; 503 if (region.getCoprocessorHost() != null) { 504 r = region.getCoprocessorHost().preAppend(append); 505 } 506 if (r == null) { 507 long nonce = startNonceOperation(m, nonceGroup); 508 boolean success = false; 509 try { 510 r = region.append(append, nonceGroup, nonce); 511 success = true; 512 } finally { 513 endNonceOperation(m, nonceGroup, success); 514 } 515 if (region.getCoprocessorHost() != null) { 516 region.getCoprocessorHost().postAppend(append, r); 517 } 518 } 519 if (regionServer.metricsRegionServer != null) { 520 regionServer.metricsRegionServer.updateAppend( 521 EnvironmentEdgeManager.currentTime() - before); 522 } 523 return r; 524 } 525 526 /** 527 * Execute an increment mutation. 528 * 529 * @param region 530 * @param mutation 531 * @return the Result 532 * @throws IOException 533 */ increment(final Region region, final OperationQuota quota, final MutationProto mutation, final CellScanner cells, long nonceGroup)534 private Result increment(final Region region, final OperationQuota quota, 535 final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException { 536 long before = EnvironmentEdgeManager.currentTime(); 537 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 538 quota.addMutation(increment); 539 Result r = null; 540 if (region.getCoprocessorHost() != null) { 541 r = region.getCoprocessorHost().preIncrement(increment); 542 } 543 if (r == null) { 544 long nonce = startNonceOperation(mutation, nonceGroup); 545 boolean success = false; 546 try { 547 r = region.increment(increment, nonceGroup, nonce); 548 success = true; 549 } finally { 550 endNonceOperation(mutation, nonceGroup, success); 551 } 552 if (region.getCoprocessorHost() != null) { 553 r = region.getCoprocessorHost().postIncrement(increment, r); 554 } 555 } 556 if (regionServer.metricsRegionServer != null) { 557 regionServer.metricsRegionServer.updateIncrement( 558 EnvironmentEdgeManager.currentTime() - before); 559 } 560 return r; 561 } 562 563 /** 564 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 565 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 566 * @param region 567 * @param actions 568 * @param cellScanner 569 * @param builder 570 * @param cellsToReturn Could be null. May be allocated in this method. This is what this 571 * method returns as a 'result'. 572 * @return Return the <code>cellScanner</code> passed 573 */ doNonAtomicRegionMutation(final Region region, final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup)574 private List<CellScannable> doNonAtomicRegionMutation(final Region region, 575 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 576 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) { 577 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 578 // one at a time, we instead pass them in batch. Be aware that the corresponding 579 // ResultOrException instance that matches each Put or Delete is then added down in the 580 // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched 581 List<ClientProtos.Action> mutations = null; 582 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 583 RpcCallContext context = RpcServer.getCurrentCall(); 584 IOException sizeIOE = null; 585 Object lastBlock = null; 586 for (ClientProtos.Action action : actions.getActionList()) { 587 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null; 588 try { 589 Result r = null; 590 591 if (context != null 592 && context.isRetryImmediatelySupported() 593 && (context.getResponseCellSize() > maxQuotaResultSize 594 || context.getResponseBlockSize() > maxQuotaResultSize)) { 595 596 // We're storing the exception since the exception and reason string won't 597 // change after the response size limit is reached. 598 if (sizeIOE == null ) { 599 // We don't need the stack un-winding do don't throw the exception. 600 // Throwing will kill the JVM's JIT. 601 // 602 // Instead just create the exception and then store it. 603 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" 604 + " CellSize: " + context.getResponseCellSize() 605 + " BlockSize: " + context.getResponseBlockSize()); 606 607 // Only report the exception once since there's only one request that 608 // caused the exception. Otherwise this number will dominate the exceptions count. 609 rpcServer.getMetrics().exception(sizeIOE); 610 } 611 612 // Now that there's an exception is known to be created 613 // use it for the response. 614 // 615 // This will create a copy in the builder. 616 resultOrExceptionBuilder = ResultOrException.newBuilder(). 617 setException(ResponseConverter.buildException(sizeIOE)); 618 resultOrExceptionBuilder.setIndex(action.getIndex()); 619 builder.addResultOrException(resultOrExceptionBuilder.build()); 620 if (cellScanner != null) { 621 skipCellsForMutation(action, cellScanner); 622 } 623 continue; 624 } 625 if (action.hasGet()) { 626 long before = EnvironmentEdgeManager.currentTime(); 627 try { 628 Get get = ProtobufUtil.toGet(action.getGet()); 629 r = region.get(get); 630 } finally { 631 if (regionServer.metricsRegionServer != null) { 632 regionServer.metricsRegionServer.updateGet( 633 EnvironmentEdgeManager.currentTime() - before); 634 } 635 } 636 } else if (action.hasServiceCall()) { 637 resultOrExceptionBuilder = ResultOrException.newBuilder(); 638 try { 639 Message result = execServiceOnRegion(region, action.getServiceCall()); 640 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 641 ClientProtos.CoprocessorServiceResult.newBuilder(); 642 resultOrExceptionBuilder.setServiceResult( 643 serviceResultBuilder.setValue( 644 serviceResultBuilder.getValueBuilder() 645 .setName(result.getClass().getName()) 646 .setValue(result.toByteString()))); 647 } catch (IOException ioe) { 648 rpcServer.getMetrics().exception(ioe); 649 resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe)); 650 } 651 } else if (action.hasMutation()) { 652 MutationType type = action.getMutation().getMutateType(); 653 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && 654 !mutations.isEmpty()) { 655 // Flush out any Puts or Deletes already collected. 656 doBatchOp(builder, region, quota, mutations, cellScanner); 657 mutations.clear(); 658 } 659 switch (type) { 660 case APPEND: 661 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup); 662 break; 663 case INCREMENT: 664 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); 665 break; 666 case PUT: 667 case DELETE: 668 // Collect the individual mutations and apply in a batch 669 if (mutations == null) { 670 mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount()); 671 } 672 mutations.add(action); 673 break; 674 default: 675 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 676 } 677 } else { 678 throw new HBaseIOException("Unexpected Action type"); 679 } 680 if (r != null) { 681 ClientProtos.Result pbResult = null; 682 if (isClientCellBlockSupport()) { 683 pbResult = ProtobufUtil.toResultNoData(r); 684 // Hard to guess the size here. Just make a rough guess. 685 if (cellsToReturn == null) { 686 cellsToReturn = new ArrayList<CellScannable>(); 687 } 688 cellsToReturn.add(r); 689 } else { 690 pbResult = ProtobufUtil.toResult(r); 691 } 692 lastBlock = addSize(context, r, lastBlock); 693 resultOrExceptionBuilder = 694 ClientProtos.ResultOrException.newBuilder().setResult(pbResult); 695 } 696 // Could get to here and there was no result and no exception. Presumes we added 697 // a Put or Delete to the collecting Mutations List for adding later. In this 698 // case the corresponding ResultOrException instance for the Put or Delete will be added 699 // down in the doBatchOp method call rather than up here. 700 } catch (IOException ie) { 701 rpcServer.getMetrics().exception(ie); 702 resultOrExceptionBuilder = ResultOrException.newBuilder(). 703 setException(ResponseConverter.buildException(ie)); 704 } 705 if (resultOrExceptionBuilder != null) { 706 // Propagate index. 707 resultOrExceptionBuilder.setIndex(action.getIndex()); 708 builder.addResultOrException(resultOrExceptionBuilder.build()); 709 } 710 } 711 // Finish up any outstanding mutations 712 if (mutations != null && !mutations.isEmpty()) { 713 doBatchOp(builder, region, quota, mutations, cellScanner); 714 } 715 return cellsToReturn; 716 } 717 718 /** 719 * Execute a list of Put/Delete mutations. 720 * 721 * @param builder 722 * @param region 723 * @param mutations 724 */ doBatchOp(final RegionActionResult.Builder builder, final Region region, final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells)725 private void doBatchOp(final RegionActionResult.Builder builder, final Region region, 726 final OperationQuota quota, 727 final List<ClientProtos.Action> mutations, final CellScanner cells) { 728 Mutation[] mArray = new Mutation[mutations.size()]; 729 long before = EnvironmentEdgeManager.currentTime(); 730 boolean batchContainsPuts = false, batchContainsDelete = false; 731 try { 732 int i = 0; 733 for (ClientProtos.Action action: mutations) { 734 MutationProto m = action.getMutation(); 735 Mutation mutation; 736 if (m.getMutateType() == MutationType.PUT) { 737 mutation = ProtobufUtil.toPut(m, cells); 738 batchContainsPuts = true; 739 } else { 740 mutation = ProtobufUtil.toDelete(m, cells); 741 batchContainsDelete = true; 742 } 743 mArray[i++] = mutation; 744 quota.addMutation(mutation); 745 } 746 747 if (!region.getRegionInfo().isMetaTable()) { 748 regionServer.cacheFlusher.reclaimMemStoreMemory(); 749 } 750 751 OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE, 752 HConstants.NO_NONCE); 753 for (i = 0; i < codes.length; i++) { 754 int index = mutations.get(i).getIndex(); 755 Exception e = null; 756 switch (codes[i].getOperationStatusCode()) { 757 case BAD_FAMILY: 758 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 759 builder.addResultOrException(getResultOrException(e, index)); 760 break; 761 762 case SANITY_CHECK_FAILURE: 763 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 764 builder.addResultOrException(getResultOrException(e, index)); 765 break; 766 767 default: 768 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 769 builder.addResultOrException(getResultOrException(e, index)); 770 break; 771 772 case SUCCESS: 773 builder.addResultOrException(getResultOrException( 774 ClientProtos.Result.getDefaultInstance(), index, 775 ((HRegion) region).getRegionStats())); 776 break; 777 } 778 } 779 } catch (IOException ie) { 780 for (int i = 0; i < mutations.size(); i++) { 781 builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex())); 782 } 783 } 784 if (regionServer.metricsRegionServer != null) { 785 long after = EnvironmentEdgeManager.currentTime(); 786 if (batchContainsPuts) { 787 regionServer.metricsRegionServer.updatePut(after - before); 788 } 789 if (batchContainsDelete) { 790 regionServer.metricsRegionServer.updateDelete(after - before); 791 } 792 } 793 } 794 795 /** 796 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 797 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 798 * @param region 799 * @param mutations 800 * @param replaySeqId 801 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 802 * exceptionMessage if any 803 * @throws IOException 804 */ doReplayBatchOp(final Region region, final List<WALSplitter.MutationReplay> mutations, long replaySeqId)805 private OperationStatus [] doReplayBatchOp(final Region region, 806 final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException { 807 long before = EnvironmentEdgeManager.currentTime(); 808 boolean batchContainsPuts = false, batchContainsDelete = false; 809 try { 810 for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) { 811 WALSplitter.MutationReplay m = it.next(); 812 813 if (m.type == MutationType.PUT) { 814 batchContainsPuts = true; 815 } else { 816 batchContainsDelete = true; 817 } 818 819 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 820 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 821 if (metaCells != null && !metaCells.isEmpty()) { 822 for (Cell metaCell : metaCells) { 823 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 824 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 825 HRegion hRegion = (HRegion)region; 826 if (compactionDesc != null) { 827 // replay the compaction. Remove the files from stores only if we are the primary 828 // region replica (thus own the files) 829 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 830 replaySeqId); 831 continue; 832 } 833 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 834 if (flushDesc != null && !isDefaultReplica) { 835 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 836 continue; 837 } 838 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 839 if (regionEvent != null && !isDefaultReplica) { 840 hRegion.replayWALRegionEventMarker(regionEvent); 841 continue; 842 } 843 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 844 if (bulkLoadEvent != null) { 845 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 846 continue; 847 } 848 } 849 it.remove(); 850 } 851 } 852 requestCount.add(mutations.size()); 853 if (!region.getRegionInfo().isMetaTable()) { 854 regionServer.cacheFlusher.reclaimMemStoreMemory(); 855 } 856 return region.batchReplay(mutations.toArray( 857 new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); 858 } finally { 859 if (regionServer.metricsRegionServer != null) { 860 long after = EnvironmentEdgeManager.currentTime(); 861 if (batchContainsPuts) { 862 regionServer.metricsRegionServer.updatePut(after - before); 863 } 864 if (batchContainsDelete) { 865 regionServer.metricsRegionServer.updateDelete(after - before); 866 } 867 } 868 } 869 } 870 closeAllScanners()871 private void closeAllScanners() { 872 // Close any outstanding scanners. Means they'll get an UnknownScanner 873 // exception next time they come in. 874 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 875 try { 876 e.getValue().s.close(); 877 } catch (IOException ioe) { 878 LOG.warn("Closing scanner " + e.getKey(), ioe); 879 } 880 } 881 } 882 RSRpcServices(HRegionServer rs)883 public RSRpcServices(HRegionServer rs) throws IOException { 884 regionServer = rs; 885 886 RpcSchedulerFactory rpcSchedulerFactory; 887 try { 888 Class<?> rpcSchedulerFactoryClass = rs.conf.getClass( 889 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 890 SimpleRpcSchedulerFactory.class); 891 rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance()); 892 } catch (InstantiationException e) { 893 throw new IllegalArgumentException(e); 894 } catch (IllegalAccessException e) { 895 throw new IllegalArgumentException(e); 896 } 897 // Server to handle client requests. 898 InetSocketAddress initialIsa; 899 InetSocketAddress bindAddress; 900 if(this instanceof MasterRpcServices) { 901 String hostname = getHostname(rs.conf, true); 902 int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); 903 // Creation of a HSA will force a resolve. 904 initialIsa = new InetSocketAddress(hostname, port); 905 bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port); 906 } else { 907 String hostname = getHostname(rs.conf, false); 908 int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT, 909 HConstants.DEFAULT_REGIONSERVER_PORT); 910 // Creation of a HSA will force a resolve. 911 initialIsa = new InetSocketAddress(hostname, port); 912 bindAddress = new InetSocketAddress( 913 rs.conf.get("hbase.regionserver.ipc.address", hostname), port); 914 } 915 if (initialIsa.getAddress() == null) { 916 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 917 } 918 priority = createPriority(); 919 String name = rs.getProcessName() + "/" + initialIsa.toString(); 920 // Set how many times to retry talking to another server over HConnection. 921 ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); 922 try { 923 rpcServer = new RpcServer(rs, name, getServices(), 924 bindAddress, // use final bindAddress for this server. 925 rs.conf, 926 rpcSchedulerFactory.create(rs.conf, this, rs)); 927 } catch (BindException be) { 928 String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT : 929 HConstants.REGIONSERVER_PORT; 930 throw new IOException(be.getMessage() + ". To switch ports use the '" + configName + 931 "' configuration property.", be.getCause() != null ? be.getCause() : be); 932 } 933 934 scannerLeaseTimeoutPeriod = rs.conf.getInt( 935 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 936 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 937 maxScannerResultSize = rs.conf.getLong( 938 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 939 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 940 rpcTimeout = rs.conf.getInt( 941 HConstants.HBASE_RPC_TIMEOUT_KEY, 942 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 943 minimumScanTimeLimitDelta = rs.conf.getLong( 944 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 945 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 946 947 InetSocketAddress address = rpcServer.getListenerAddress(); 948 if (address == null) { 949 throw new IOException("Listener channel is closed"); 950 } 951 // Set our address, however we need the final port that was given to rpcServer 952 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); 953 rpcServer.setErrorHandler(this); 954 rs.setName(name); 955 } 956 957 @Override onConfigurationChange(Configuration newConf)958 public void onConfigurationChange(Configuration newConf) { 959 if (rpcServer instanceof ConfigurationObserver) { 960 ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf); 961 } 962 } 963 createPriority()964 protected PriorityFunction createPriority() { 965 return new AnnotationReadingPriorityFunction(this); 966 } 967 getHostname(Configuration conf, boolean isMaster)968 public static String getHostname(Configuration conf, boolean isMaster) 969 throws UnknownHostException { 970 String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY : 971 HRegionServer.RS_HOSTNAME_KEY); 972 if (hostname == null || hostname.isEmpty()) { 973 String masterOrRS = isMaster ? "master" : "regionserver"; 974 return Strings.domainNamePointerToHostName(DNS.getDefaultHost( 975 conf.get("hbase." + masterOrRS + ".dns.interface", "default"), 976 conf.get("hbase." + masterOrRS + ".dns.nameserver", "default"))); 977 } else { 978 LOG.info("hostname is configured to be " + hostname); 979 return hostname; 980 } 981 } 982 getScanner(long scannerId)983 RegionScanner getScanner(long scannerId) { 984 String scannerIdString = Long.toString(scannerId); 985 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 986 if (scannerHolder != null) { 987 return scannerHolder.s; 988 } 989 return null; 990 } 991 992 /** 993 * Get the vtime associated with the scanner. 994 * Currently the vtime is the number of "next" calls. 995 */ getScannerVirtualTime(long scannerId)996 long getScannerVirtualTime(long scannerId) { 997 String scannerIdString = Long.toString(scannerId); 998 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 999 if (scannerHolder != null) { 1000 return scannerHolder.getNextCallSeq(); 1001 } 1002 return 0L; 1003 } 1004 addScanner(RegionScanner s, Region r)1005 long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException { 1006 long scannerId = this.scannerIdGen.incrementAndGet(); 1007 String scannerName = String.valueOf(scannerId); 1008 1009 RegionScannerHolder existing = 1010 scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r)); 1011 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!"; 1012 1013 regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1014 new ScannerListener(scannerName)); 1015 return scannerId; 1016 } 1017 1018 /** 1019 * Method to account for the size of retained cells and retained data blocks. 1020 * @return an object that represents the last referenced block from this response. 1021 */ addSize(RpcCallContext context, Result r, Object lastBlock)1022 Object addSize(RpcCallContext context, Result r, Object lastBlock) { 1023 if (context != null && !r.isEmpty()) { 1024 for (Cell c : r.rawCells()) { 1025 context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c)); 1026 // We're using the last block being the same as the current block as 1027 // a proxy for pointing to a new block. This won't be exact. 1028 // If there are multiple gets that bounce back and forth 1029 // Then it's possible that this will over count the size of 1030 // referenced blocks. However it's better to over count and 1031 // use two RPC's than to OOME the RegionServer. 1032 byte[] valueArray = c.getValueArray(); 1033 if (valueArray != lastBlock) { 1034 context.incrementResponseBlockSize(valueArray.length); 1035 lastBlock = valueArray; 1036 } 1037 } 1038 } 1039 return lastBlock; 1040 } 1041 1042 1043 /** 1044 * Find the HRegion based on a region specifier 1045 * 1046 * @param regionSpecifier the region specifier 1047 * @return the corresponding region 1048 * @throws IOException if the specifier is not null, 1049 * but failed to find the region 1050 */ getRegion( final RegionSpecifier regionSpecifier)1051 Region getRegion( 1052 final RegionSpecifier regionSpecifier) throws IOException { 1053 return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(), 1054 ProtobufUtil.getRegionEncodedName(regionSpecifier)); 1055 } 1056 1057 @VisibleForTesting getPriority()1058 public PriorityFunction getPriority() { 1059 return priority; 1060 } 1061 1062 @VisibleForTesting getConfiguration()1063 public Configuration getConfiguration() { 1064 return regionServer.getConfiguration(); 1065 } 1066 getQuotaManager()1067 private RegionServerQuotaManager getQuotaManager() { 1068 return regionServer.getRegionServerQuotaManager(); 1069 } 1070 start()1071 void start() { 1072 rpcServer.start(); 1073 } 1074 stop()1075 void stop() { 1076 closeAllScanners(); 1077 rpcServer.stop(); 1078 } 1079 1080 /** 1081 * Called to verify that this server is up and running. 1082 * 1083 * @throws IOException 1084 */ checkOpen()1085 protected void checkOpen() throws IOException { 1086 if (regionServer.isAborted()) { 1087 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting"); 1088 } 1089 if (regionServer.isStopped()) { 1090 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); 1091 } 1092 if (!regionServer.fsOk) { 1093 throw new RegionServerStoppedException("File system not available"); 1094 } 1095 if (!regionServer.isOnline()) { 1096 throw new ServerNotRunningYetException("Server is not running yet"); 1097 } 1098 } 1099 1100 /** 1101 * @return list of blocking services and their security info classes that this server supports 1102 */ getServices()1103 protected List<BlockingServiceAndInterface> getServices() { 1104 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2); 1105 bssi.add(new BlockingServiceAndInterface( 1106 ClientService.newReflectiveBlockingService(this), 1107 ClientService.BlockingInterface.class)); 1108 bssi.add(new BlockingServiceAndInterface( 1109 AdminService.newReflectiveBlockingService(this), 1110 AdminService.BlockingInterface.class)); 1111 return bssi; 1112 } 1113 getSocketAddress()1114 public InetSocketAddress getSocketAddress() { 1115 return isa; 1116 } 1117 1118 @Override getPriority(RequestHeader header, Message param, User user)1119 public int getPriority(RequestHeader header, Message param, User user) { 1120 return priority.getPriority(header, param, user); 1121 } 1122 1123 @Override getDeadline(RequestHeader header, Message param)1124 public long getDeadline(RequestHeader header, Message param) { 1125 return priority.getDeadline(header, param); 1126 } 1127 1128 /* 1129 * Check if an OOME and, if so, abort immediately to avoid creating more objects. 1130 * 1131 * @param e 1132 * 1133 * @return True if we OOME'd and are aborting. 1134 */ 1135 @Override checkOOME(final Throwable e)1136 public boolean checkOOME(final Throwable e) { 1137 boolean stop = false; 1138 try { 1139 if (e instanceof OutOfMemoryError 1140 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) 1141 || (e.getMessage() != null && e.getMessage().contains( 1142 "java.lang.OutOfMemoryError"))) { 1143 stop = true; 1144 LOG.fatal("Run out of memory; " + getClass().getSimpleName() 1145 + " will abort itself immediately", e); 1146 } 1147 } finally { 1148 if (stop) { 1149 Runtime.getRuntime().halt(1); 1150 } 1151 } 1152 return stop; 1153 } 1154 1155 /** 1156 * Close a region on the region server. 1157 * 1158 * @param controller the RPC controller 1159 * @param request the request 1160 * @throws ServiceException 1161 */ 1162 @Override 1163 @QosPriority(priority=HConstants.ADMIN_QOS) closeRegion(final RpcController controller, final CloseRegionRequest request)1164 public CloseRegionResponse closeRegion(final RpcController controller, 1165 final CloseRegionRequest request) throws ServiceException { 1166 final ServerName sn = (request.hasDestinationServer() ? 1167 ProtobufUtil.toServerName(request.getDestinationServer()) : null); 1168 1169 try { 1170 checkOpen(); 1171 if (request.hasServerStartCode()) { 1172 // check that we are the same server that this RPC is intended for. 1173 long serverStartCode = request.getServerStartCode(); 1174 if (regionServer.serverName.getStartcode() != serverStartCode) { 1175 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " + 1176 "different server with startCode: " + serverStartCode + ", this server is: " 1177 + regionServer.serverName)); 1178 } 1179 } 1180 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1181 1182 // Can be null if we're calling close on a region that's not online 1183 final Region region = regionServer.getFromOnlineRegions(encodedRegionName); 1184 if ((region != null) && (region .getCoprocessorHost() != null)) { 1185 region.getCoprocessorHost().preClose(false); 1186 } 1187 1188 requestCount.increment(); 1189 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1190 CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager() 1191 .getCloseRegionCoordination().parseFromProtoRequest(request); 1192 1193 boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn); 1194 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1195 return builder.build(); 1196 } catch (IOException ie) { 1197 throw new ServiceException(ie); 1198 } 1199 } 1200 1201 /** 1202 * Compact a region on the region server. 1203 * 1204 * @param controller the RPC controller 1205 * @param request the request 1206 * @throws ServiceException 1207 */ 1208 @Override 1209 @QosPriority(priority=HConstants.ADMIN_QOS) compactRegion(final RpcController controller, final CompactRegionRequest request)1210 public CompactRegionResponse compactRegion(final RpcController controller, 1211 final CompactRegionRequest request) throws ServiceException { 1212 try { 1213 checkOpen(); 1214 requestCount.increment(); 1215 Region region = getRegion(request.getRegion()); 1216 region.startRegionOperation(Operation.COMPACT_REGION); 1217 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1218 boolean major = false; 1219 byte [] family = null; 1220 Store store = null; 1221 if (request.hasFamily()) { 1222 family = request.getFamily().toByteArray(); 1223 store = region.getStore(family); 1224 if (store == null) { 1225 throw new ServiceException(new IOException("column family " + Bytes.toString(family) 1226 + " does not exist in region " + region.getRegionInfo().getRegionNameAsString())); 1227 } 1228 } 1229 if (request.hasMajor()) { 1230 major = request.getMajor(); 1231 } 1232 if (major) { 1233 if (family != null) { 1234 store.triggerMajorCompaction(); 1235 } else { 1236 region.triggerMajorCompaction(); 1237 } 1238 } 1239 1240 String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):""; 1241 if (LOG.isTraceEnabled()) { 1242 LOG.trace("User-triggered compaction requested for region " 1243 + region.getRegionInfo().getRegionNameAsString() + familyLogMsg); 1244 } 1245 String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg; 1246 if(family != null) { 1247 regionServer.compactSplitThread.requestCompaction(region, store, log, 1248 Store.PRIORITY_USER, null, RpcServer.getRequestUser()); 1249 } else { 1250 regionServer.compactSplitThread.requestCompaction(region, log, 1251 Store.PRIORITY_USER, null, RpcServer.getRequestUser()); 1252 } 1253 return CompactRegionResponse.newBuilder().build(); 1254 } catch (IOException ie) { 1255 throw new ServiceException(ie); 1256 } 1257 } 1258 1259 /** 1260 * Flush a region on the region server. 1261 * 1262 * @param controller the RPC controller 1263 * @param request the request 1264 * @throws ServiceException 1265 */ 1266 @Override 1267 @QosPriority(priority=HConstants.ADMIN_QOS) flushRegion(final RpcController controller, final FlushRegionRequest request)1268 public FlushRegionResponse flushRegion(final RpcController controller, 1269 final FlushRegionRequest request) throws ServiceException { 1270 try { 1271 checkOpen(); 1272 requestCount.increment(); 1273 Region region = getRegion(request.getRegion()); 1274 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1275 boolean shouldFlush = true; 1276 if (request.hasIfOlderThanTs()) { 1277 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1278 } 1279 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1280 if (shouldFlush) { 1281 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? 1282 request.getWriteFlushWalMarker() : false; 1283 long startTime = EnvironmentEdgeManager.currentTime(); 1284 // Go behind the curtain so we can manage writing of the flush WAL marker 1285 HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl) 1286 ((HRegion)region).flushcache(true, writeFlushWalMarker); 1287 if (flushResult.isFlushSucceeded()) { 1288 long endTime = EnvironmentEdgeManager.currentTime(); 1289 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); 1290 } 1291 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1292 if (compactionNeeded) { 1293 regionServer.compactSplitThread.requestSystemCompaction(region, 1294 "Compaction through user triggered flush"); 1295 } 1296 builder.setFlushed(flushResult.isFlushSucceeded()); 1297 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1298 } 1299 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1300 return builder.build(); 1301 } catch (DroppedSnapshotException ex) { 1302 // Cache flush can fail in a few places. If it fails in a critical 1303 // section, we get a DroppedSnapshotException and a replay of wal 1304 // is required. Currently the only way to do this is a restart of 1305 // the server. 1306 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); 1307 throw new ServiceException(ex); 1308 } catch (IOException ie) { 1309 throw new ServiceException(ie); 1310 } 1311 } 1312 1313 @Override 1314 @QosPriority(priority=HConstants.ADMIN_QOS) 1315 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1316 final GetOnlineRegionRequest request) throws ServiceException { 1317 try { 1318 checkOpen(); 1319 requestCount.increment(); 1320 Map<String, Region> onlineRegions = regionServer.onlineRegions; 1321 List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size()); 1322 for (Region region: onlineRegions.values()) { 1323 list.add(region.getRegionInfo()); 1324 } 1325 Collections.sort(list); 1326 return ResponseConverter.buildGetOnlineRegionResponse(list); 1327 } catch (IOException ie) { 1328 throw new ServiceException(ie); 1329 } 1330 } 1331 1332 @Override 1333 @QosPriority(priority=HConstants.ADMIN_QOS) 1334 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1335 final GetRegionInfoRequest request) throws ServiceException { 1336 try { 1337 checkOpen(); 1338 requestCount.increment(); 1339 Region region = getRegion(request.getRegion()); 1340 HRegionInfo info = region.getRegionInfo(); 1341 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1342 builder.setRegionInfo(HRegionInfo.convert(info)); 1343 if (request.hasCompactionState() && request.getCompactionState()) { 1344 builder.setCompactionState(region.getCompactionState()); 1345 } 1346 builder.setIsRecovering(region.isRecovering()); 1347 return builder.build(); 1348 } catch (IOException ie) { 1349 throw new ServiceException(ie); 1350 } 1351 } 1352 1353 /** 1354 * Get some information of the region server. 1355 * 1356 * @param controller the RPC controller 1357 * @param request the request 1358 * @throws ServiceException 1359 */ 1360 @Override 1361 @QosPriority(priority=HConstants.ADMIN_QOS) 1362 public GetServerInfoResponse getServerInfo(final RpcController controller, 1363 final GetServerInfoRequest request) throws ServiceException { 1364 try { 1365 checkOpen(); 1366 } catch (IOException ie) { 1367 throw new ServiceException(ie); 1368 } 1369 requestCount.increment(); 1370 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1; 1371 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort); 1372 } 1373 1374 @Override 1375 @QosPriority(priority=HConstants.ADMIN_QOS) 1376 public GetStoreFileResponse getStoreFile(final RpcController controller, 1377 final GetStoreFileRequest request) throws ServiceException { 1378 try { 1379 checkOpen(); 1380 Region region = getRegion(request.getRegion()); 1381 requestCount.increment(); 1382 Set<byte[]> columnFamilies; 1383 if (request.getFamilyCount() == 0) { 1384 columnFamilies = region.getTableDesc().getFamiliesKeys(); 1385 } else { 1386 columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR); 1387 for (ByteString cf: request.getFamilyList()) { 1388 columnFamilies.add(cf.toByteArray()); 1389 } 1390 } 1391 int nCF = columnFamilies.size(); 1392 List<String> fileList = region.getStoreFileList( 1393 columnFamilies.toArray(new byte[nCF][])); 1394 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1395 builder.addAllStoreFile(fileList); 1396 return builder.build(); 1397 } catch (IOException ie) { 1398 throw new ServiceException(ie); 1399 } 1400 } 1401 1402 /** 1403 * Merge regions on the region server. 1404 * 1405 * @param controller the RPC controller 1406 * @param request the request 1407 * @return merge regions response 1408 * @throws ServiceException 1409 */ 1410 @Override 1411 @QosPriority(priority = HConstants.ADMIN_QOS) 1412 public MergeRegionsResponse mergeRegions(final RpcController controller, 1413 final MergeRegionsRequest request) throws ServiceException { 1414 try { 1415 checkOpen(); 1416 requestCount.increment(); 1417 Region regionA = getRegion(request.getRegionA()); 1418 Region regionB = getRegion(request.getRegionB()); 1419 boolean forcible = request.getForcible(); 1420 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1421 regionA.startRegionOperation(Operation.MERGE_REGION); 1422 regionB.startRegionOperation(Operation.MERGE_REGION); 1423 if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || 1424 regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { 1425 throw new ServiceException(new MergeRegionException("Can't merge non-default replicas")); 1426 } 1427 LOG.info("Receiving merging request for " + regionA + ", " + regionB 1428 + ",forcible=" + forcible); 1429 long startTime = EnvironmentEdgeManager.currentTime(); 1430 FlushResult flushResult = regionA.flush(true); 1431 if (flushResult.isFlushSucceeded()) { 1432 long endTime = EnvironmentEdgeManager.currentTime(); 1433 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); 1434 } 1435 startTime = EnvironmentEdgeManager.currentTime(); 1436 flushResult = regionB.flush(true); 1437 if (flushResult.isFlushSucceeded()) { 1438 long endTime = EnvironmentEdgeManager.currentTime(); 1439 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); 1440 } 1441 regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, 1442 masterSystemTime, RpcServer.getRequestUser()); 1443 return MergeRegionsResponse.newBuilder().build(); 1444 } catch (DroppedSnapshotException ex) { 1445 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); 1446 throw new ServiceException(ex); 1447 } catch (IOException ie) { 1448 throw new ServiceException(ie); 1449 } 1450 } 1451 1452 /** 1453 * Open asynchronously a region or a set of regions on the region server. 1454 * 1455 * The opening is coordinated by ZooKeeper, and this method requires the znode to be created 1456 * before being called. As a consequence, this method should be called only from the master. 1457 * <p> 1458 * Different manages states for the region are: 1459 * </p><ul> 1460 * <li>region not opened: the region opening will start asynchronously.</li> 1461 * <li>a close is already in progress: this is considered as an error.</li> 1462 * <li>an open is already in progress: this new open request will be ignored. This is important 1463 * because the Master can do multiple requests if it crashes.</li> 1464 * <li>the region is already opened: this new open request will be ignored.</li> 1465 * </ul> 1466 * <p> 1467 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1468 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1469 * errors are put in the response as FAILED_OPENING. 1470 * </p> 1471 * @param controller the RPC controller 1472 * @param request the request 1473 * @throws ServiceException 1474 */ 1475 @Override 1476 @QosPriority(priority=HConstants.ADMIN_QOS) 1477 public OpenRegionResponse openRegion(final RpcController controller, 1478 final OpenRegionRequest request) throws ServiceException { 1479 requestCount.increment(); 1480 if (request.hasServerStartCode()) { 1481 // check that we are the same server that this RPC is intended for. 1482 long serverStartCode = request.getServerStartCode(); 1483 if (regionServer.serverName.getStartcode() != serverStartCode) { 1484 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " + 1485 "different server with startCode: " + serverStartCode + ", this server is: " 1486 + regionServer.serverName)); 1487 } 1488 } 1489 1490 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1491 final int regionCount = request.getOpenInfoCount(); 1492 final Map<TableName, HTableDescriptor> htds = 1493 new HashMap<TableName, HTableDescriptor>(regionCount); 1494 final boolean isBulkAssign = regionCount > 1; 1495 try { 1496 checkOpen(); 1497 } catch (IOException ie) { 1498 TableName tableName = null; 1499 if (regionCount == 1) { 1500 RegionInfo ri = request.getOpenInfo(0).getRegion(); 1501 if (ri != null) { 1502 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1503 } 1504 } 1505 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1506 throw new ServiceException(ie); 1507 } 1508 // We are assigning meta, wait a little for regionserver to finish initialization. 1509 int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1510 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout 1511 long endTime = System.currentTimeMillis() + timeout; 1512 synchronized (regionServer.online) { 1513 try { 1514 while (System.currentTimeMillis() <= endTime 1515 && !regionServer.isStopped() && !regionServer.isOnline()) { 1516 regionServer.online.wait(regionServer.msgInterval); 1517 } 1518 checkOpen(); 1519 } catch (InterruptedException t) { 1520 Thread.currentThread().interrupt(); 1521 throw new ServiceException(t); 1522 } catch (IOException e) { 1523 throw new ServiceException(e); 1524 } 1525 } 1526 } 1527 1528 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1529 1530 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1531 final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion()); 1532 OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager(). 1533 getOpenRegionCoordination(); 1534 OpenRegionCoordination.OpenRegionDetails ord = 1535 coordination.parseFromProtoRequest(regionOpenInfo); 1536 1537 HTableDescriptor htd; 1538 try { 1539 final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName()); 1540 if (onlineRegion != null) { 1541 //Check if the region can actually be opened. 1542 if (onlineRegion.getCoprocessorHost() != null) { 1543 onlineRegion.getCoprocessorHost().preOpen(); 1544 } 1545 // See HBASE-5094. Cross check with hbase:meta if still this RS is owning 1546 // the region. 1547 Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion( 1548 regionServer.getConnection(), region.getRegionName()); 1549 if (regionServer.serverName.equals(p.getSecond())) { 1550 Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes()); 1551 // Map regionsInTransitionInRSOnly has an entry for a region only if the region 1552 // is in transition on this RS, so here closing can be null. If not null, it can 1553 // be true or false. True means the region is opening on this RS; while false 1554 // means the region is closing. Only return ALREADY_OPENED if not closing (i.e. 1555 // not in transition any more, or still transition to open. 1556 if (!Boolean.FALSE.equals(closing) 1557 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) { 1558 LOG.warn("Attempted open of " + region.getEncodedName() 1559 + " but already online on this server"); 1560 builder.addOpeningState(RegionOpeningState.ALREADY_OPENED); 1561 continue; 1562 } 1563 } else { 1564 LOG.warn("The region " + region.getEncodedName() + " is online on this server" 1565 + " but hbase:meta does not have this server - continue opening."); 1566 regionServer.removeFromOnlineRegions(onlineRegion, null); 1567 } 1568 } 1569 LOG.info("Open " + region.getRegionNameAsString()); 1570 htd = htds.get(region.getTable()); 1571 if (htd == null) { 1572 htd = regionServer.tableDescriptors.get(region.getTable()); 1573 htds.put(region.getTable(), htd); 1574 } 1575 1576 final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent( 1577 region.getEncodedNameAsBytes(), Boolean.TRUE); 1578 1579 if (Boolean.FALSE.equals(previous)) { 1580 // There is a close in progress. We need to mark this open as failed in ZK. 1581 1582 coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord); 1583 1584 throw new RegionAlreadyInTransitionException("Received OPEN for the region:" 1585 + region.getRegionNameAsString() + " , which we are already trying to CLOSE "); 1586 } 1587 1588 if (Boolean.TRUE.equals(previous)) { 1589 // An open is in progress. This is supported, but let's log this. 1590 LOG.info("Receiving OPEN for the region:" + 1591 region.getRegionNameAsString() + " , which we are already trying to OPEN" 1592 + " - ignoring this new request for this region."); 1593 } 1594 1595 // We are opening this region. If it moves back and forth for whatever reason, we don't 1596 // want to keep returning the stale moved record while we are opening/if we close again. 1597 regionServer.removeFromMovedRegions(region.getEncodedName()); 1598 1599 if (previous == null) { 1600 // check if the region to be opened is marked in recovering state in ZK 1601 if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(), 1602 region.getEncodedName())) { 1603 // Check if current region open is for distributedLogReplay. This check is to support 1604 // rolling restart/upgrade where we want to Master/RS see same configuration 1605 if (!regionOpenInfo.hasOpenForDistributedLogReplay() 1606 || regionOpenInfo.getOpenForDistributedLogReplay()) { 1607 regionServer.recoveringRegions.put(region.getEncodedName(), null); 1608 } else { 1609 // Remove stale recovery region from ZK when we open region not for recovering which 1610 // could happen when turn distributedLogReplay off from on. 1611 List<String> tmpRegions = new ArrayList<String>(); 1612 tmpRegions.add(region.getEncodedName()); 1613 ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), 1614 tmpRegions); 1615 } 1616 } 1617 // If there is no action in progress, we can submit a specific handler. 1618 // Need to pass the expected version in the constructor. 1619 if (region.isMetaRegion()) { 1620 regionServer.service.submit(new OpenMetaHandler( 1621 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord)); 1622 } else { 1623 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), 1624 regionOpenInfo.getFavoredNodesList()); 1625 regionServer.service.submit(new OpenRegionHandler( 1626 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord)); 1627 } 1628 } 1629 1630 builder.addOpeningState(RegionOpeningState.OPENED); 1631 1632 } catch (KeeperException zooKeeperEx) { 1633 LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx); 1634 throw new ServiceException(zooKeeperEx); 1635 } catch (IOException ie) { 1636 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 1637 if (isBulkAssign) { 1638 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 1639 } else { 1640 throw new ServiceException(ie); 1641 } 1642 } 1643 } 1644 return builder.build(); 1645 } 1646 1647 /** 1648 * Wamrmup a region on this server. 1649 * 1650 * This method should only be called by Master. It synchrnously opens the region and 1651 * closes the region bringing the most important pages in cache. 1652 * <p> 1653 * 1654 * @param controller the RPC controller 1655 * @param request the request 1656 * @throws ServiceException 1657 */ 1658 @Override warmupRegion(final RpcController controller, final WarmupRegionRequest request)1659 public WarmupRegionResponse warmupRegion(final RpcController controller, 1660 final WarmupRegionRequest request) throws ServiceException { 1661 1662 RegionInfo regionInfo = request.getRegionInfo(); 1663 final HRegionInfo region = HRegionInfo.convert(regionInfo); 1664 HTableDescriptor htd; 1665 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 1666 1667 try { 1668 checkOpen(); 1669 String encodedName = region.getEncodedName(); 1670 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1671 final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName); 1672 1673 if (onlineRegion != null) { 1674 LOG.info("Region already online. Skipping warming up " + region); 1675 return response; 1676 } 1677 1678 if (LOG.isDebugEnabled()) { 1679 LOG.debug("Warming up Region " + region.getRegionNameAsString()); 1680 } 1681 1682 htd = regionServer.tableDescriptors.get(region.getTable()); 1683 1684 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 1685 LOG.info("Region is in transition. Skipping warmup " + region); 1686 return response; 1687 } 1688 1689 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region), 1690 regionServer.getConfiguration(), regionServer, null); 1691 1692 } catch (IOException ie) { 1693 LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie); 1694 throw new ServiceException(ie); 1695 } 1696 1697 return response; 1698 } 1699 1700 /** 1701 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 1702 * that the given mutations will be durable on the receiving RS if this method returns without any 1703 * exception. 1704 * @param controller the RPC controller 1705 * @param request the request 1706 * @throws ServiceException 1707 */ 1708 @Override 1709 @QosPriority(priority = HConstants.REPLAY_QOS) replay(final RpcController controller, final ReplicateWALEntryRequest request)1710 public ReplicateWALEntryResponse replay(final RpcController controller, 1711 final ReplicateWALEntryRequest request) throws ServiceException { 1712 long before = EnvironmentEdgeManager.currentTime(); 1713 CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); 1714 try { 1715 checkOpen(); 1716 List<WALEntry> entries = request.getEntryList(); 1717 if (entries == null || entries.isEmpty()) { 1718 // empty input 1719 return ReplicateWALEntryResponse.newBuilder().build(); 1720 } 1721 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 1722 Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); 1723 RegionCoprocessorHost coprocessorHost = 1724 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 1725 ? region.getCoprocessorHost() 1726 : null; // do not invoke coprocessors if this is a secondary region replica 1727 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>(); 1728 1729 // Skip adding the edits to WAL if this is a secondary region replica 1730 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1731 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 1732 1733 for (WALEntry entry : entries) { 1734 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 1735 throw new NotServingRegionException("Replay request contains entries from multiple " + 1736 "regions. First region:" + regionName.toStringUtf8() + " , other region:" 1737 + entry.getKey().getEncodedRegionName()); 1738 } 1739 if (regionServer.nonceManager != null && isPrimary) { 1740 long nonceGroup = entry.getKey().hasNonceGroup() 1741 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 1742 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 1743 regionServer.nonceManager.reportOperationFromWal( 1744 nonceGroup, 1745 nonce, 1746 entry.getKey().getWriteTime()); 1747 } 1748 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : 1749 new Pair<WALKey, WALEdit>(); 1750 List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry, 1751 cells, walEntry, durability); 1752 if (coprocessorHost != null) { 1753 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 1754 // KeyValue. 1755 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 1756 walEntry.getSecond())) { 1757 // if bypass this log entry, ignore it ... 1758 continue; 1759 } 1760 walEntries.add(walEntry); 1761 } 1762 if(edits!=null && !edits.isEmpty()) { 1763 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 1764 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); 1765 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 1766 // check if it's a partial success 1767 for (int i = 0; result != null && i < result.length; i++) { 1768 if (result[i] != OperationStatus.SUCCESS) { 1769 throw new IOException(result[i].getExceptionMsg()); 1770 } 1771 } 1772 } 1773 } 1774 1775 //sync wal at the end because ASYNC_WAL is used above 1776 WAL wal = getWAL(region); 1777 if (wal != null) { 1778 wal.sync(); 1779 } 1780 1781 if (coprocessorHost != null) { 1782 for (Pair<WALKey, WALEdit> entry : walEntries) { 1783 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 1784 entry.getSecond()); 1785 } 1786 } 1787 return ReplicateWALEntryResponse.newBuilder().build(); 1788 } catch (IOException ie) { 1789 throw new ServiceException(ie); 1790 } finally { 1791 if (regionServer.metricsRegionServer != null) { 1792 regionServer.metricsRegionServer.updateReplay( 1793 EnvironmentEdgeManager.currentTime() - before); 1794 } 1795 } 1796 } 1797 getWAL(Region region)1798 WAL getWAL(Region region) { 1799 return ((HRegion)region).getWAL(); 1800 } 1801 1802 /** 1803 * Replicate WAL entries on the region server. 1804 * 1805 * @param controller the RPC controller 1806 * @param request the request 1807 * @throws ServiceException 1808 */ 1809 @Override 1810 @QosPriority(priority=HConstants.REPLICATION_QOS) replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request)1811 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 1812 final ReplicateWALEntryRequest request) throws ServiceException { 1813 try { 1814 checkOpen(); 1815 if (regionServer.replicationSinkHandler != null) { 1816 requestCount.increment(); 1817 List<WALEntry> entries = request.getEntryList(); 1818 CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner(); 1819 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); 1820 regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner); 1821 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner); 1822 return ReplicateWALEntryResponse.newBuilder().build(); 1823 } else { 1824 throw new ServiceException("Replication services are not initialized yet"); 1825 } 1826 } catch (IOException ie) { 1827 throw new ServiceException(ie); 1828 } 1829 } 1830 1831 /** 1832 * Roll the WAL writer of the region server. 1833 * @param controller the RPC controller 1834 * @param request the request 1835 * @throws ServiceException 1836 */ 1837 @Override rollWALWriter(final RpcController controller, final RollWALWriterRequest request)1838 public RollWALWriterResponse rollWALWriter(final RpcController controller, 1839 final RollWALWriterRequest request) throws ServiceException { 1840 try { 1841 checkOpen(); 1842 requestCount.increment(); 1843 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 1844 regionServer.walRoller.requestRollAll(); 1845 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 1846 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 1847 return builder.build(); 1848 } catch (IOException ie) { 1849 throw new ServiceException(ie); 1850 } 1851 } 1852 1853 /** 1854 * Split a region on the region server. 1855 * 1856 * @param controller the RPC controller 1857 * @param request the request 1858 * @throws ServiceException 1859 */ 1860 @Override 1861 @QosPriority(priority=HConstants.ADMIN_QOS) splitRegion(final RpcController controller, final SplitRegionRequest request)1862 public SplitRegionResponse splitRegion(final RpcController controller, 1863 final SplitRegionRequest request) throws ServiceException { 1864 try { 1865 checkOpen(); 1866 requestCount.increment(); 1867 Region region = getRegion(request.getRegion()); 1868 region.startRegionOperation(Operation.SPLIT_REGION); 1869 if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { 1870 throw new IOException("Can't split replicas directly. " 1871 + "Replicas are auto-split when their primary is split."); 1872 } 1873 LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString()); 1874 long startTime = EnvironmentEdgeManager.currentTime(); 1875 FlushResult flushResult = region.flush(true); 1876 if (flushResult.isFlushSucceeded()) { 1877 long endTime = EnvironmentEdgeManager.currentTime(); 1878 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); 1879 } 1880 byte[] splitPoint = null; 1881 if (request.hasSplitPoint()) { 1882 splitPoint = request.getSplitPoint().toByteArray(); 1883 } 1884 ((HRegion)region).forceSplit(splitPoint); 1885 regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(), 1886 RpcServer.getRequestUser()); 1887 return SplitRegionResponse.newBuilder().build(); 1888 } catch (DroppedSnapshotException ex) { 1889 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); 1890 throw new ServiceException(ex); 1891 } catch (IOException ie) { 1892 throw new ServiceException(ie); 1893 } 1894 } 1895 1896 /** 1897 * Stop the region server. 1898 * 1899 * @param controller the RPC controller 1900 * @param request the request 1901 * @throws ServiceException 1902 */ 1903 @Override 1904 @QosPriority(priority=HConstants.ADMIN_QOS) stopServer(final RpcController controller, final StopServerRequest request)1905 public StopServerResponse stopServer(final RpcController controller, 1906 final StopServerRequest request) throws ServiceException { 1907 requestCount.increment(); 1908 String reason = request.getReason(); 1909 regionServer.stop(reason); 1910 return StopServerResponse.newBuilder().build(); 1911 } 1912 1913 @Override updateFavoredNodes(RpcController controller, UpdateFavoredNodesRequest request)1914 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 1915 UpdateFavoredNodesRequest request) throws ServiceException { 1916 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 1917 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 1918 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 1919 HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion()); 1920 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(), 1921 regionUpdateInfo.getFavoredNodesList()); 1922 } 1923 respBuilder.setResponse(openInfoList.size()); 1924 return respBuilder.build(); 1925 } 1926 1927 /** 1928 * Atomically bulk load several HFiles into an open region 1929 * @return true if successful, false is failed but recoverably (no action) 1930 * @throws ServiceException if failed unrecoverably 1931 */ 1932 @Override bulkLoadHFile(final RpcController controller, final BulkLoadHFileRequest request)1933 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 1934 final BulkLoadHFileRequest request) throws ServiceException { 1935 try { 1936 checkOpen(); 1937 requestCount.increment(); 1938 Region region = getRegion(request.getRegion()); 1939 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>(); 1940 for (FamilyPath familyPath: request.getFamilyPathList()) { 1941 familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(), 1942 familyPath.getPath())); 1943 } 1944 boolean bypass = false; 1945 if (region.getCoprocessorHost() != null) { 1946 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); 1947 } 1948 boolean loaded = false; 1949 if (!bypass) { 1950 loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); 1951 } 1952 if (region.getCoprocessorHost() != null) { 1953 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); 1954 } 1955 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 1956 builder.setLoaded(loaded); 1957 return builder.build(); 1958 } catch (IOException ie) { 1959 throw new ServiceException(ie); 1960 } 1961 } 1962 1963 @Override execService(final RpcController controller, final CoprocessorServiceRequest request)1964 public CoprocessorServiceResponse execService(final RpcController controller, 1965 final CoprocessorServiceRequest request) throws ServiceException { 1966 try { 1967 checkOpen(); 1968 requestCount.increment(); 1969 Region region = getRegion(request.getRegion()); 1970 Message result = execServiceOnRegion(region, request.getCall()); 1971 CoprocessorServiceResponse.Builder builder = 1972 CoprocessorServiceResponse.newBuilder(); 1973 builder.setRegion(RequestConverter.buildRegionSpecifier( 1974 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName())); 1975 builder.setValue( 1976 builder.getValueBuilder().setName(result.getClass().getName()) 1977 .setValue(result.toByteString())); 1978 return builder.build(); 1979 } catch (IOException ie) { 1980 throw new ServiceException(ie); 1981 } 1982 } 1983 execServiceOnRegion(Region region, final ClientProtos.CoprocessorServiceCall serviceCall)1984 private Message execServiceOnRegion(Region region, 1985 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 1986 // ignore the passed in controller (from the serialized call) 1987 ServerRpcController execController = new ServerRpcController(); 1988 return region.execService(execController, serviceCall); 1989 } 1990 1991 /** 1992 * Get data from a table. 1993 * 1994 * @param controller the RPC controller 1995 * @param request the get request 1996 * @throws ServiceException 1997 */ 1998 @Override get(final RpcController controller, final GetRequest request)1999 public GetResponse get(final RpcController controller, 2000 final GetRequest request) throws ServiceException { 2001 long before = EnvironmentEdgeManager.currentTime(); 2002 OperationQuota quota = null; 2003 try { 2004 checkOpen(); 2005 requestCount.increment(); 2006 Region region = getRegion(request.getRegion()); 2007 2008 GetResponse.Builder builder = GetResponse.newBuilder(); 2009 ClientProtos.Get get = request.getGet(); 2010 Boolean existence = null; 2011 Result r = null; 2012 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2013 2014 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2015 if (get.getColumnCount() != 1) { 2016 throw new DoNotRetryIOException( 2017 "get ClosestRowBefore supports one and only one family now, not " 2018 + get.getColumnCount() + " families"); 2019 } 2020 byte[] row = get.getRow().toByteArray(); 2021 byte[] family = get.getColumn(0).getFamily().toByteArray(); 2022 r = region.getClosestRowBefore(row, family); 2023 } else { 2024 Get clientGet = ProtobufUtil.toGet(get); 2025 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2026 existence = region.getCoprocessorHost().preExists(clientGet); 2027 } 2028 if (existence == null) { 2029 r = region.get(clientGet); 2030 if (get.getExistenceOnly()) { 2031 boolean exists = r.getExists(); 2032 if (region.getCoprocessorHost() != null) { 2033 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2034 } 2035 existence = exists; 2036 } 2037 } 2038 } 2039 if (existence != null){ 2040 ClientProtos.Result pbr = 2041 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2042 builder.setResult(pbr); 2043 } else if (r != null) { 2044 ClientProtos.Result pbr = ProtobufUtil.toResult(r); 2045 builder.setResult(pbr); 2046 } 2047 if (r != null) { 2048 quota.addGetResult(r); 2049 } 2050 return builder.build(); 2051 } catch (IOException ie) { 2052 throw new ServiceException(ie); 2053 } finally { 2054 if (regionServer.metricsRegionServer != null) { 2055 regionServer.metricsRegionServer.updateGet( 2056 EnvironmentEdgeManager.currentTime() - before); 2057 } 2058 if (quota != null) { 2059 quota.close(); 2060 } 2061 } 2062 } 2063 2064 /** 2065 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2066 * 2067 * @param rpcc the RPC controller 2068 * @param request the multi request 2069 * @throws ServiceException 2070 */ 2071 @Override multi(final RpcController rpcc, final MultiRequest request)2072 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2073 throws ServiceException { 2074 try { 2075 checkOpen(); 2076 } catch (IOException ie) { 2077 throw new ServiceException(ie); 2078 } 2079 2080 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2081 // It is also the conduit via which we pass back data. 2082 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; 2083 CellScanner cellScanner = controller != null ? controller.cellScanner(): null; 2084 if (controller != null) { 2085 controller.setCellScanner(null); 2086 } 2087 2088 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2089 2090 // this will contain all the cells that we need to return. It's created later, if needed. 2091 List<CellScannable> cellsToReturn = null; 2092 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2093 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2094 Boolean processed = null; 2095 2096 for (RegionAction regionAction : request.getRegionActionList()) { 2097 this.requestCount.add(regionAction.getActionCount()); 2098 OperationQuota quota; 2099 Region region; 2100 regionActionResultBuilder.clear(); 2101 try { 2102 region = getRegion(regionAction.getRegion()); 2103 quota = getQuotaManager().checkQuota(region, regionAction.getActionList()); 2104 } catch (IOException e) { 2105 rpcServer.getMetrics().exception(e); 2106 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2107 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2108 // All Mutations in this RegionAction not executed as we can not see the Region online here 2109 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2110 // corresponding to these Mutations. 2111 if (cellScanner != null) { 2112 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2113 } 2114 continue; // For this region it's a failure. 2115 } 2116 2117 if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2118 // How does this call happen? It may need some work to play well w/ the surroundings. 2119 // Need to return an item per Action along w/ Action index. TODO. 2120 try { 2121 if (request.hasCondition()) { 2122 Condition condition = request.getCondition(); 2123 byte[] row = condition.getRow().toByteArray(); 2124 byte[] family = condition.getFamily().toByteArray(); 2125 byte[] qualifier = condition.getQualifier().toByteArray(); 2126 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name()); 2127 ByteArrayComparable comparator = 2128 ProtobufUtil.toComparator(condition.getComparator()); 2129 processed = checkAndRowMutate(region, regionAction.getActionList(), 2130 cellScanner, row, family, qualifier, compareOp, comparator); 2131 } else { 2132 ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(), 2133 cellScanner); 2134 // add the stats to the request 2135 if(stats != null) { 2136 responseBuilder.addRegionActionResult(RegionActionResult.newBuilder() 2137 .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats))); 2138 } 2139 processed = Boolean.TRUE; 2140 } 2141 } catch (IOException e) { 2142 rpcServer.getMetrics().exception(e); 2143 // As it's atomic, we may expect it's a global failure. 2144 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2145 } 2146 } else { 2147 // doNonAtomicRegionMutation manages the exception internally 2148 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2149 regionActionResultBuilder, cellsToReturn, nonceGroup); 2150 } 2151 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2152 quota.close(); 2153 } 2154 // Load the controller with the Cells to return. 2155 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2156 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2157 } 2158 if (processed != null) { 2159 responseBuilder.setProcessed(processed); 2160 } 2161 return responseBuilder.build(); 2162 } 2163 skipCellsForMutations(List<Action> actions, CellScanner cellScanner)2164 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2165 for (Action action : actions) { 2166 skipCellsForMutation(action, cellScanner); 2167 } 2168 } 2169 skipCellsForMutation(Action action, CellScanner cellScanner)2170 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2171 try { 2172 if (action.hasMutation()) { 2173 MutationProto m = action.getMutation(); 2174 if (m.hasAssociatedCellCount()) { 2175 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2176 cellScanner.advance(); 2177 } 2178 } 2179 } 2180 } catch (IOException e) { 2181 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2182 // marked as failed as we could not see the Region here. At client side the top level 2183 // RegionAction exception will be considered first. 2184 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2185 } 2186 } 2187 2188 /** 2189 * Mutate data in a table. 2190 * 2191 * @param rpcc the RPC controller 2192 * @param request the mutate request 2193 * @throws ServiceException 2194 */ 2195 @Override mutate(final RpcController rpcc, final MutateRequest request)2196 public MutateResponse mutate(final RpcController rpcc, 2197 final MutateRequest request) throws ServiceException { 2198 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2199 // It is also the conduit via which we pass back data. 2200 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; 2201 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2202 OperationQuota quota = null; 2203 // Clear scanner so we are not holding on to reference across call. 2204 if (controller != null) { 2205 controller.setCellScanner(null); 2206 } 2207 try { 2208 checkOpen(); 2209 requestCount.increment(); 2210 Region region = getRegion(request.getRegion()); 2211 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2212 MutationProto mutation = request.getMutation(); 2213 if (!region.getRegionInfo().isMetaTable()) { 2214 regionServer.cacheFlusher.reclaimMemStoreMemory(); 2215 } 2216 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2217 Result r = null; 2218 Boolean processed = null; 2219 MutationType type = mutation.getMutateType(); 2220 long mutationSize = 0; 2221 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2222 switch (type) { 2223 case APPEND: 2224 // TODO: this doesn't actually check anything. 2225 r = append(region, quota, mutation, cellScanner, nonceGroup); 2226 break; 2227 case INCREMENT: 2228 // TODO: this doesn't actually check anything. 2229 r = increment(region, quota, mutation, cellScanner, nonceGroup); 2230 break; 2231 case PUT: 2232 Put put = ProtobufUtil.toPut(mutation, cellScanner); 2233 quota.addMutation(put); 2234 if (request.hasCondition()) { 2235 Condition condition = request.getCondition(); 2236 byte[] row = condition.getRow().toByteArray(); 2237 byte[] family = condition.getFamily().toByteArray(); 2238 byte[] qualifier = condition.getQualifier().toByteArray(); 2239 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name()); 2240 ByteArrayComparable comparator = 2241 ProtobufUtil.toComparator(condition.getComparator()); 2242 if (region.getCoprocessorHost() != null) { 2243 processed = region.getCoprocessorHost().preCheckAndPut( 2244 row, family, qualifier, compareOp, comparator, put); 2245 } 2246 if (processed == null) { 2247 boolean result = region.checkAndMutate(row, family, 2248 qualifier, compareOp, comparator, put, true); 2249 if (region.getCoprocessorHost() != null) { 2250 result = region.getCoprocessorHost().postCheckAndPut(row, family, 2251 qualifier, compareOp, comparator, put, result); 2252 } 2253 processed = result; 2254 } 2255 } else { 2256 region.put(put); 2257 processed = Boolean.TRUE; 2258 } 2259 break; 2260 case DELETE: 2261 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 2262 quota.addMutation(delete); 2263 if (request.hasCondition()) { 2264 Condition condition = request.getCondition(); 2265 byte[] row = condition.getRow().toByteArray(); 2266 byte[] family = condition.getFamily().toByteArray(); 2267 byte[] qualifier = condition.getQualifier().toByteArray(); 2268 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name()); 2269 ByteArrayComparable comparator = 2270 ProtobufUtil.toComparator(condition.getComparator()); 2271 if (region.getCoprocessorHost() != null) { 2272 processed = region.getCoprocessorHost().preCheckAndDelete( 2273 row, family, qualifier, compareOp, comparator, delete); 2274 } 2275 if (processed == null) { 2276 boolean result = region.checkAndMutate(row, family, 2277 qualifier, compareOp, comparator, delete, true); 2278 if (region.getCoprocessorHost() != null) { 2279 result = region.getCoprocessorHost().postCheckAndDelete(row, family, 2280 qualifier, compareOp, comparator, delete, result); 2281 } 2282 processed = result; 2283 } 2284 } else { 2285 region.delete(delete); 2286 processed = Boolean.TRUE; 2287 } 2288 break; 2289 default: 2290 throw new DoNotRetryIOException( 2291 "Unsupported mutate type: " + type.name()); 2292 } 2293 if (processed != null) builder.setProcessed(processed.booleanValue()); 2294 addResult(builder, r, controller); 2295 return builder.build(); 2296 } catch (IOException ie) { 2297 regionServer.checkFileSystem(); 2298 throw new ServiceException(ie); 2299 } finally { 2300 if (quota != null) { 2301 quota.close(); 2302 } 2303 } 2304 } 2305 2306 /** 2307 * Scan data in a table. 2308 * 2309 * @param controller the RPC controller 2310 * @param request the scan request 2311 * @throws ServiceException 2312 */ 2313 @Override scan(final RpcController controller, final ScanRequest request)2314 public ScanResponse scan(final RpcController controller, final ScanRequest request) 2315 throws ServiceException { 2316 OperationQuota quota = null; 2317 Leases.Lease lease = null; 2318 String scannerName = null; 2319 try { 2320 if (!request.hasScannerId() && !request.hasScan()) { 2321 throw new DoNotRetryIOException( 2322 "Missing required input: scannerId or scan"); 2323 } 2324 long scannerId = -1; 2325 if (request.hasScannerId()) { 2326 scannerId = request.getScannerId(); 2327 scannerName = String.valueOf(scannerId); 2328 } 2329 try { 2330 checkOpen(); 2331 } catch (IOException e) { 2332 // If checkOpen failed, server not running or filesystem gone, 2333 // cancel this lease; filesystem is gone or we're closing or something. 2334 if (scannerName != null) { 2335 LOG.debug("Server shutting down and client tried to access missing scanner " 2336 + scannerName); 2337 if (regionServer.leases != null) { 2338 try { 2339 regionServer.leases.cancelLease(scannerName); 2340 } catch (LeaseException le) { 2341 // No problem, ignore 2342 if (LOG.isTraceEnabled()) { 2343 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 2344 } 2345 } 2346 } 2347 } 2348 throw e; 2349 } 2350 requestCount.increment(); 2351 2352 int ttl = 0; 2353 Region region = null; 2354 RegionScanner scanner = null; 2355 RegionScannerHolder rsh = null; 2356 boolean moreResults = true; 2357 boolean closeScanner = false; 2358 boolean isSmallScan = false; 2359 RpcCallContext context = RpcServer.getCurrentCall(); 2360 Object lastBlock = null; 2361 2362 ScanResponse.Builder builder = ScanResponse.newBuilder(); 2363 if (request.hasCloseScanner()) { 2364 closeScanner = request.getCloseScanner(); 2365 } 2366 int rows = closeScanner ? 0 : 1; 2367 if (request.hasNumberOfRows()) { 2368 rows = request.getNumberOfRows(); 2369 } 2370 if (request.hasScannerId()) { 2371 rsh = scanners.get(scannerName); 2372 if (rsh == null) { 2373 LOG.info("Client tried to access missing scanner " + scannerName); 2374 throw new UnknownScannerException( 2375 "Name: " + scannerName + ", already closed?"); 2376 } 2377 scanner = rsh.s; 2378 HRegionInfo hri = scanner.getRegionInfo(); 2379 region = regionServer.getRegion(hri.getRegionName()); 2380 if (region != rsh.r) { // Yes, should be the same instance 2381 throw new NotServingRegionException("Region was re-opened after the scanner" 2382 + scannerName + " was created: " + hri.getRegionNameAsString()); 2383 } 2384 } else { 2385 region = getRegion(request.getRegion()); 2386 ClientProtos.Scan protoScan = request.getScan(); 2387 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 2388 Scan scan = ProtobufUtil.toScan(protoScan); 2389 // if the request doesn't set this, get the default region setting. 2390 if (!isLoadingCfsOnDemandSet) { 2391 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2392 } 2393 2394 isSmallScan = scan.isSmall(); 2395 if (!scan.hasFamilies()) { 2396 // Adding all families to scanner 2397 for (byte[] family: region.getTableDesc().getFamiliesKeys()) { 2398 scan.addFamily(family); 2399 } 2400 } 2401 2402 if (region.getCoprocessorHost() != null) { 2403 scanner = region.getCoprocessorHost().preScannerOpen(scan); 2404 } 2405 if (scanner == null) { 2406 scanner = region.getScanner(scan); 2407 } 2408 if (region.getCoprocessorHost() != null) { 2409 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 2410 } 2411 scannerId = addScanner(scanner, region); 2412 scannerName = String.valueOf(scannerId); 2413 ttl = this.scannerLeaseTimeoutPeriod; 2414 } 2415 if (request.hasRenew() && request.getRenew()) { 2416 rsh = scanners.get(scannerName); 2417 lease = regionServer.leases.removeLease(scannerName); 2418 if (lease != null && rsh != null) { 2419 regionServer.leases.addLease(lease); 2420 // Increment the nextCallSeq value which is the next expected from client. 2421 rsh.incNextCallSeq(); 2422 } 2423 return builder.build(); 2424 } 2425 2426 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 2427 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 2428 if (rows > 0) { 2429 // if nextCallSeq does not match throw Exception straight away. This needs to be 2430 // performed even before checking of Lease. 2431 // See HBASE-5974 2432 if (request.hasNextCallSeq()) { 2433 if (rsh == null) { 2434 rsh = scanners.get(scannerName); 2435 } 2436 if (rsh != null) { 2437 if (request.getNextCallSeq() != rsh.getNextCallSeq()) { 2438 throw new OutOfOrderScannerNextException( 2439 "Expected nextCallSeq: " + rsh.getNextCallSeq() 2440 + " But the nextCallSeq got from client: " + request.getNextCallSeq() + 2441 "; request=" + TextFormat.shortDebugString(request)); 2442 } 2443 // Increment the nextCallSeq value which is the next expected from client. 2444 rsh.incNextCallSeq(); 2445 } 2446 } 2447 try { 2448 // Remove lease while its being processed in server; protects against case 2449 // where processing of request takes > lease expiration time. 2450 lease = regionServer.leases.removeLease(scannerName); 2451 List<Result> results = new ArrayList<Result>(); 2452 2453 boolean done = false; 2454 // Call coprocessor. Get region info from scanner. 2455 if (region != null && region.getCoprocessorHost() != null) { 2456 Boolean bypass = region.getCoprocessorHost().preScannerNext( 2457 scanner, results, rows); 2458 if (!results.isEmpty()) { 2459 for (Result r : results) { 2460 lastBlock = addSize(context, r, lastBlock); 2461 } 2462 } 2463 if (bypass != null && bypass.booleanValue()) { 2464 done = true; 2465 } 2466 } 2467 2468 if (!done) { 2469 long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 2470 if (maxResultSize <= 0) { 2471 maxResultSize = maxQuotaResultSize; 2472 } 2473 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 2474 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 2475 // arbitrary 32. TODO: keep record of general size of results being returned. 2476 List<Cell> values = new ArrayList<Cell>(32); 2477 region.startRegionOperation(Operation.SCAN); 2478 try { 2479 int i = 0; 2480 synchronized(scanner) { 2481 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 2482 boolean clientHandlesPartials = 2483 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 2484 boolean clientHandlesHeartbeats = 2485 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 2486 2487 // On the server side we must ensure that the correct ordering of partial results is 2488 // returned to the client to allow them to properly reconstruct the partial results. 2489 // If the coprocessor host is adding to the result list, we cannot guarantee the 2490 // correct ordering of partial results and so we prevent partial results from being 2491 // formed. 2492 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 2493 boolean allowPartialResults = 2494 clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; 2495 boolean moreRows = false; 2496 2497 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 2498 // certain time threshold on the server. When the time threshold is exceeded, the 2499 // server stops the scan and sends back whatever Results it has accumulated within 2500 // that time period (may be empty). Since heartbeat messages have the potential to 2501 // create partial Results (in the event that the timeout occurs in the middle of a 2502 // row), we must only generate heartbeat messages when the client can handle both 2503 // heartbeats AND partials 2504 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 2505 2506 // Default value of timeLimit is negative to indicate no timeLimit should be 2507 // enforced. 2508 long timeLimit = -1; 2509 2510 // Set the time limit to be half of the more restrictive timeout value (one of the 2511 // timeout values must be positive). In the event that both values are positive, the 2512 // more restrictive of the two is used to calculate the limit. 2513 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { 2514 long timeLimitDelta; 2515 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { 2516 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); 2517 } else { 2518 timeLimitDelta = 2519 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; 2520 } 2521 // Use half of whichever timeout value was more restrictive... But don't allow 2522 // the time limit to be less than the allowable minimum (could cause an 2523 // immediatate timeout before scanning any data). 2524 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 2525 timeLimit = System.currentTimeMillis() + timeLimitDelta; 2526 } 2527 2528 final LimitScope sizeScope = 2529 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 2530 final LimitScope timeScope = 2531 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 2532 2533 boolean trackMetrics = 2534 request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 2535 2536 // Configure with limits for this RPC. Set keep progress true since size progress 2537 // towards size limit should be kept between calls to nextRaw 2538 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 2539 contextBuilder.setSizeLimit(sizeScope, maxResultSize); 2540 contextBuilder.setBatchLimit(scanner.getBatch()); 2541 contextBuilder.setTimeLimit(timeScope, timeLimit); 2542 contextBuilder.setTrackMetrics(trackMetrics); 2543 ScannerContext scannerContext = contextBuilder.build(); 2544 2545 boolean limitReached = false; 2546 while (i < rows) { 2547 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 2548 // batch limit is a limit on the number of cells per Result. Thus, if progress is 2549 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 2550 // reset the batch progress between nextRaw invocations since we don't want the 2551 // batch progress from previous calls to affect future calls 2552 scannerContext.setBatchProgress(0); 2553 2554 // Collect values to be returned here 2555 moreRows = scanner.nextRaw(values, scannerContext); 2556 2557 if (!values.isEmpty()) { 2558 final boolean partial = scannerContext.partialResultFormed(); 2559 Result r = Result.create(values, null, stale, partial); 2560 lastBlock = addSize(context, r, lastBlock); 2561 results.add(r); 2562 i++; 2563 } 2564 2565 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 2566 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 2567 boolean rowLimitReached = i >= rows; 2568 limitReached = sizeLimitReached || timeLimitReached || rowLimitReached; 2569 2570 if (limitReached || !moreRows) { 2571 if (LOG.isTraceEnabled()) { 2572 LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " 2573 + moreRows + " scannerContext: " + scannerContext); 2574 } 2575 // We only want to mark a ScanResponse as a heartbeat message in the event that 2576 // there are more values to be read server side. If there aren't more values, 2577 // marking it as a heartbeat is wasteful because the client will need to issue 2578 // another ScanRequest only to realize that they already have all the values 2579 if (moreRows) { 2580 // Heartbeat messages occur when the time limit has been reached. 2581 builder.setHeartbeatMessage(timeLimitReached); 2582 } 2583 break; 2584 } 2585 values.clear(); 2586 } 2587 2588 if (limitReached || moreRows) { 2589 // We stopped prematurely 2590 builder.setMoreResultsInRegion(true); 2591 } else { 2592 // We didn't get a single batch 2593 builder.setMoreResultsInRegion(false); 2594 } 2595 2596 // Check to see if the client requested that we track metrics server side. If the 2597 // client requested metrics, retrieve the metrics from the scanner context. 2598 if (trackMetrics) { 2599 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 2600 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 2601 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 2602 2603 for (Entry<String, Long> entry : metrics.entrySet()) { 2604 pairBuilder.setName(entry.getKey()); 2605 pairBuilder.setValue(entry.getValue()); 2606 metricBuilder.addMetrics(pairBuilder.build()); 2607 } 2608 2609 builder.setScanMetrics(metricBuilder.build()); 2610 } 2611 } 2612 region.updateReadRequestsCount(i); 2613 long responseCellSize = context != null ? context.getResponseCellSize() : 0; 2614 region.getMetrics().updateScanNext(responseCellSize); 2615 if (regionServer.metricsRegionServer != null) { 2616 regionServer.metricsRegionServer.updateScannerNext(responseCellSize); 2617 } 2618 } finally { 2619 region.closeRegionOperation(); 2620 } 2621 2622 // coprocessor postNext hook 2623 if (region != null && region.getCoprocessorHost() != null) { 2624 region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); 2625 } 2626 } 2627 2628 quota.addScanResult(results); 2629 2630 // If the scanner's filter - if any - is done with the scan 2631 // and wants to tell the client to stop the scan. This is done by passing 2632 // a null result, and setting moreResults to false. 2633 if (scanner.isFilterDone() && results.isEmpty()) { 2634 moreResults = false; 2635 results = null; 2636 } else { 2637 addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())); 2638 } 2639 } catch (IOException e) { 2640 // if we have an exception on scanner next and we are using the callSeq 2641 // we should rollback because the client will retry with the same callSeq 2642 // and get an OutOfOrderScannerNextException if we don't do so. 2643 if (rsh != null && request.hasNextCallSeq()) { 2644 rsh.rollbackNextCallSeq(); 2645 } 2646 throw e; 2647 } finally { 2648 // We're done. On way out re-add the above removed lease. 2649 // Adding resets expiration time on lease. 2650 if (scanners.containsKey(scannerName)) { 2651 if (lease != null) regionServer.leases.addLease(lease); 2652 ttl = this.scannerLeaseTimeoutPeriod; 2653 } 2654 } 2655 } 2656 2657 if (!moreResults || closeScanner) { 2658 ttl = 0; 2659 moreResults = false; 2660 if (region != null && region.getCoprocessorHost() != null) { 2661 if (region.getCoprocessorHost().preScannerClose(scanner)) { 2662 return builder.build(); // bypass 2663 } 2664 } 2665 rsh = scanners.remove(scannerName); 2666 if (rsh != null) { 2667 scanner = rsh.s; 2668 scanner.close(); 2669 regionServer.leases.cancelLease(scannerName); 2670 if (region != null && region.getCoprocessorHost() != null) { 2671 region.getCoprocessorHost().postScannerClose(scanner); 2672 } 2673 } 2674 } 2675 2676 if (ttl > 0) { 2677 builder.setTtl(ttl); 2678 } 2679 builder.setScannerId(scannerId); 2680 builder.setMoreResults(moreResults); 2681 return builder.build(); 2682 } catch (IOException ie) { 2683 if (scannerName != null && ie instanceof NotServingRegionException) { 2684 RegionScannerHolder rsh = scanners.remove(scannerName); 2685 if (rsh != null) { 2686 try { 2687 RegionScanner scanner = rsh.s; 2688 LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ..."); 2689 scanner.close(); 2690 regionServer.leases.cancelLease(scannerName); 2691 } catch (IOException e) { 2692 LOG.warn("Getting exception closing " + scannerName, e); 2693 } 2694 } 2695 } 2696 throw new ServiceException(ie); 2697 } finally { 2698 if (quota != null) { 2699 quota.close(); 2700 } 2701 } 2702 } 2703 2704 @Override execRegionServerService(RpcController controller, CoprocessorServiceRequest request)2705 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 2706 CoprocessorServiceRequest request) throws ServiceException { 2707 return regionServer.execRegionServerService(controller, request); 2708 } 2709 2710 @Override updateConfiguration( RpcController controller, UpdateConfigurationRequest request)2711 public UpdateConfigurationResponse updateConfiguration( 2712 RpcController controller, UpdateConfigurationRequest request) 2713 throws ServiceException { 2714 try { 2715 this.regionServer.updateConfiguration(); 2716 } catch (Exception e) { 2717 throw new ServiceException(e); 2718 } 2719 return UpdateConfigurationResponse.getDefaultInstance(); 2720 } 2721 } 2722