1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.regionserver;
20 
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.BindException;
24 import java.net.InetSocketAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.NavigableMap;
34 import java.util.Set;
35 import java.util.TreeSet;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.atomic.AtomicLong;
38 
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.CellScannable;
44 import org.apache.hadoop.hbase.CellScanner;
45 import org.apache.hadoop.hbase.CellUtil;
46 import org.apache.hadoop.hbase.DoNotRetryIOException;
47 import org.apache.hadoop.hbase.DroppedSnapshotException;
48 import org.apache.hadoop.hbase.HBaseIOException;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.HRegionInfo;
51 import org.apache.hadoop.hbase.HTableDescriptor;
52 import org.apache.hadoop.hbase.MetaTableAccessor;
53 import org.apache.hadoop.hbase.MultiActionResultTooLarge;
54 import org.apache.hadoop.hbase.NotServingRegionException;
55 import org.apache.hadoop.hbase.ServerName;
56 import org.apache.hadoop.hbase.TableName;
57 import org.apache.hadoop.hbase.UnknownScannerException;
58 import org.apache.hadoop.hbase.classification.InterfaceAudience;
59 import org.apache.hadoop.hbase.client.Append;
60 import org.apache.hadoop.hbase.client.ConnectionUtils;
61 import org.apache.hadoop.hbase.client.Delete;
62 import org.apache.hadoop.hbase.client.Durability;
63 import org.apache.hadoop.hbase.client.Get;
64 import org.apache.hadoop.hbase.client.Increment;
65 import org.apache.hadoop.hbase.client.Mutation;
66 import org.apache.hadoop.hbase.client.Put;
67 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68 import org.apache.hadoop.hbase.client.Result;
69 import org.apache.hadoop.hbase.client.RowMutations;
70 import org.apache.hadoop.hbase.client.Scan;
71 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
72 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
73 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
74 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
75 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
76 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
77 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
78 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
79 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
80 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
81 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
82 import org.apache.hadoop.hbase.ipc.PriorityFunction;
83 import org.apache.hadoop.hbase.ipc.QosPriority;
84 import org.apache.hadoop.hbase.ipc.RpcCallContext;
85 import org.apache.hadoop.hbase.ipc.RpcServer;
86 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
87 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
88 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
89 import org.apache.hadoop.hbase.ipc.ServerRpcController;
90 import org.apache.hadoop.hbase.master.MasterRpcServices;
91 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
92 import org.apache.hadoop.hbase.protobuf.RequestConverter;
93 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
94 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
95 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
96 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
97 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
98 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
99 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
131 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
132 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
153 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
154 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
156 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
157 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
158 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
159 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
160 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
162 import org.apache.hadoop.hbase.quotas.OperationQuota;
163 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
164 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
165 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
166 import org.apache.hadoop.hbase.regionserver.Region.Operation;
167 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
168 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
169 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
170 import org.apache.hadoop.hbase.wal.WAL;
171 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
172 import org.apache.hadoop.hbase.security.User;
173 import org.apache.hadoop.hbase.util.Bytes;
174 import org.apache.hadoop.hbase.util.Counter;
175 import org.apache.hadoop.hbase.util.DNS;
176 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
177 import org.apache.hadoop.hbase.util.Pair;
178 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
179 import org.apache.hadoop.hbase.util.Strings;
180 import org.apache.hadoop.hbase.wal.WALKey;
181 import org.apache.hadoop.hbase.wal.WALSplitter;
182 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
183 import org.apache.zookeeper.KeeperException;
184 
185 import com.google.common.annotations.VisibleForTesting;
186 import com.google.protobuf.ByteString;
187 import com.google.protobuf.Message;
188 import com.google.protobuf.RpcController;
189 import com.google.protobuf.ServiceException;
190 import com.google.protobuf.TextFormat;
191 
192 /**
193  * Implements the regionserver RPC services.
194  */
195 @InterfaceAudience.Private
196 @SuppressWarnings("deprecation")
197 public class RSRpcServices implements HBaseRPCErrorHandler,
198     AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
199     ConfigurationObserver {
200   protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
201 
202   /** RPC scheduler to use for the region server. */
203   public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
204     "hbase.region.server.rpc.scheduler.factory.class";
205 
206   /**
207    * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
208    * configuration exists to prevent the scenario where a time limit is specified to be so
209    * restrictive that the time limit is reached immediately (before any cells are scanned).
210    */
211   private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
212       "hbase.region.server.rpc.minimum.scan.time.limit.delta";
213   /**
214    * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
215    */
216   private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
217 
218   // Request counter. (Includes requests that are not serviced by regions.)
219   final Counter requestCount = new Counter();
220   // Server to handle client requests.
221   final RpcServerInterface rpcServer;
222   final InetSocketAddress isa;
223 
224   private final HRegionServer regionServer;
225   private final long maxScannerResultSize;
226 
227   // The reference to the priority extraction function
228   private final PriorityFunction priority;
229 
230   private final AtomicLong scannerIdGen = new AtomicLong(0L);
231   private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
232     new ConcurrentHashMap<String, RegionScannerHolder>();
233 
234   /**
235    * The lease timeout period for client scanners (milliseconds).
236    */
237   private final int scannerLeaseTimeoutPeriod;
238 
239   /**
240    * The RPC timeout period (milliseconds)
241    */
242   private final int rpcTimeout;
243 
244   /**
245    * The minimum allowable delta to use for the scan limit
246    */
247   private final long minimumScanTimeLimitDelta;
248 
249   /**
250    * Holder class which holds the RegionScanner and nextCallSeq together.
251    */
252   private static class RegionScannerHolder {
253     private AtomicLong nextCallSeq = new AtomicLong(0);
254     private RegionScanner s;
255     private Region r;
256 
RegionScannerHolder(RegionScanner s, Region r)257     public RegionScannerHolder(RegionScanner s, Region r) {
258       this.s = s;
259       this.r = r;
260     }
261 
getNextCallSeq()262     private long getNextCallSeq() {
263       return nextCallSeq.get();
264     }
265 
incNextCallSeq()266     private void incNextCallSeq() {
267       nextCallSeq.incrementAndGet();
268     }
269 
rollbackNextCallSeq()270     private void rollbackNextCallSeq() {
271       nextCallSeq.decrementAndGet();
272     }
273   }
274 
275   /**
276    * Instantiated as a scanner lease. If the lease times out, the scanner is
277    * closed
278    */
279   private class ScannerListener implements LeaseListener {
280     private final String scannerName;
281 
ScannerListener(final String n)282     ScannerListener(final String n) {
283       this.scannerName = n;
284     }
285 
286     @Override
leaseExpired()287     public void leaseExpired() {
288       RegionScannerHolder rsh = scanners.remove(this.scannerName);
289       if (rsh != null) {
290         RegionScanner s = rsh.s;
291         LOG.info("Scanner " + this.scannerName + " lease expired on region "
292           + s.getRegionInfo().getRegionNameAsString());
293         try {
294           Region region = regionServer.getRegion(s.getRegionInfo().getRegionName());
295           if (region != null && region.getCoprocessorHost() != null) {
296             region.getCoprocessorHost().preScannerClose(s);
297           }
298 
299           s.close();
300           if (region != null && region.getCoprocessorHost() != null) {
301             region.getCoprocessorHost().postScannerClose(s);
302           }
303         } catch (IOException e) {
304           LOG.error("Closing scanner for "
305             + s.getRegionInfo().getRegionNameAsString(), e);
306         }
307       } else {
308         LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
309           " scanner found, hence no chance to close that related scanner!");
310       }
311     }
312   }
313 
getResultOrException( final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats)314   private static ResultOrException getResultOrException(
315       final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
316     return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
317   }
318 
getResultOrException(final Exception e, final int index)319   private static ResultOrException getResultOrException(final Exception e, final int index) {
320     return getResultOrException(ResponseConverter.buildActionResult(e), index);
321   }
322 
getResultOrException( final ResultOrException.Builder builder, final int index)323   private static ResultOrException getResultOrException(
324       final ResultOrException.Builder builder, final int index) {
325     return builder.setIndex(index).build();
326   }
327 
328   /**
329    * Starts the nonce operation for a mutation, if needed.
330    * @param mutation Mutation.
331    * @param nonceGroup Nonce group from the request.
332    * @returns Nonce used (can be NO_NONCE).
333    */
startNonceOperation(final MutationProto mutation, long nonceGroup)334   private long startNonceOperation(final MutationProto mutation, long nonceGroup)
335       throws IOException, OperationConflictException {
336     if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
337     boolean canProceed = false;
338     try {
339       canProceed = regionServer.nonceManager.startOperation(
340         nonceGroup, mutation.getNonce(), regionServer);
341     } catch (InterruptedException ex) {
342       throw new InterruptedIOException("Nonce start operation interrupted");
343     }
344     if (!canProceed) {
345       // TODO: instead, we could convert append/increment to get w/mvcc
346       String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
347         + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
348         + "] may have already completed";
349       throw new OperationConflictException(message);
350     }
351     return mutation.getNonce();
352   }
353 
354   /**
355    * Ends nonce operation for a mutation, if needed.
356    * @param mutation Mutation.
357    * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
358    * @param success Whether the operation for this nonce has succeeded.
359    */
endNonceOperation(final MutationProto mutation, long nonceGroup, boolean success)360   private void endNonceOperation(final MutationProto mutation,
361       long nonceGroup, boolean success) {
362     if (regionServer.nonceManager != null && mutation.hasNonce()) {
363       regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
364     }
365   }
366 
367   /**
368    * @return True if current call supports cellblocks
369    */
isClientCellBlockSupport()370   private boolean isClientCellBlockSupport() {
371     RpcCallContext context = RpcServer.getCurrentCall();
372     return context != null && context.isClientCellBlockSupported();
373   }
374 
addResult(final MutateResponse.Builder builder, final Result result, final PayloadCarryingRpcController rpcc)375   private void addResult(final MutateResponse.Builder builder,
376       final Result result, final PayloadCarryingRpcController rpcc) {
377     if (result == null) return;
378     if (isClientCellBlockSupport()) {
379       builder.setResult(ProtobufUtil.toResultNoData(result));
380       rpcc.setCellScanner(result.cellScanner());
381     } else {
382       ClientProtos.Result pbr = ProtobufUtil.toResult(result);
383       builder.setResult(pbr);
384     }
385   }
386 
addResults(final ScanResponse.Builder builder, final List<Result> results, final RpcController controller, boolean isDefaultRegion)387   private void addResults(final ScanResponse.Builder builder, final List<Result> results,
388       final RpcController controller, boolean isDefaultRegion) {
389     builder.setStale(!isDefaultRegion);
390     if (results == null || results.isEmpty()) return;
391     if (isClientCellBlockSupport()) {
392       for (Result res : results) {
393         builder.addCellsPerResult(res.size());
394         builder.addPartialFlagPerResult(res.isPartial());
395       }
396       ((PayloadCarryingRpcController)controller).
397         setCellScanner(CellUtil.createCellScanner(results));
398     } else {
399       for (Result res: results) {
400         ClientProtos.Result pbr = ProtobufUtil.toResult(res);
401         builder.addResults(pbr);
402       }
403     }
404   }
405 
406   /**
407    * Mutate a list of rows atomically.
408    *
409    * @param region
410    * @param actions
411    * @param cellScanner if non-null, the mutation data -- the Cell content.
412    * @throws IOException
413    */
mutateRows(final Region region, final List<ClientProtos.Action> actions, final CellScanner cellScanner)414   private ClientProtos.RegionLoadStats mutateRows(final Region region,
415       final List<ClientProtos.Action> actions,
416       final CellScanner cellScanner) throws IOException {
417     if (!region.getRegionInfo().isMetaTable()) {
418       regionServer.cacheFlusher.reclaimMemStoreMemory();
419     }
420     RowMutations rm = null;
421     for (ClientProtos.Action action: actions) {
422       if (action.hasGet()) {
423         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
424           action.getGet());
425       }
426       MutationType type = action.getMutation().getMutateType();
427       if (rm == null) {
428         rm = new RowMutations(action.getMutation().getRow().toByteArray());
429       }
430       switch (type) {
431         case PUT:
432           rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
433           break;
434         case DELETE:
435           rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
436           break;
437         default:
438           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
439       }
440     }
441     region.mutateRow(rm);
442     return ((HRegion)region).getRegionStats();
443   }
444 
445   /**
446    * Mutate a list of rows atomically.
447    *
448    * @param region
449    * @param actions
450    * @param cellScanner if non-null, the mutation data -- the Cell content.
451    * @param row
452    * @param family
453    * @param qualifier
454    * @param compareOp
455    * @param comparator @throws IOException
456    */
checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator)457   private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
458       final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
459       CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
460     if (!region.getRegionInfo().isMetaTable()) {
461       regionServer.cacheFlusher.reclaimMemStoreMemory();
462     }
463     RowMutations rm = null;
464     for (ClientProtos.Action action: actions) {
465       if (action.hasGet()) {
466         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
467             action.getGet());
468       }
469       MutationType type = action.getMutation().getMutateType();
470       if (rm == null) {
471         rm = new RowMutations(action.getMutation().getRow().toByteArray());
472       }
473       switch (type) {
474         case PUT:
475           rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
476           break;
477         case DELETE:
478           rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
479           break;
480         default:
481           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
482       }
483     }
484     return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
485   }
486 
487   /**
488    * Execute an append mutation.
489    *
490    * @param region
491    * @param m
492    * @param cellScanner
493    * @return result to return to client if default operation should be
494    * bypassed as indicated by RegionObserver, null otherwise
495    * @throws IOException
496    */
append(final Region region, final OperationQuota quota, final MutationProto m, final CellScanner cellScanner, long nonceGroup)497   private Result append(final Region region, final OperationQuota quota, final MutationProto m,
498       final CellScanner cellScanner, long nonceGroup) throws IOException {
499     long before = EnvironmentEdgeManager.currentTime();
500     Append append = ProtobufUtil.toAppend(m, cellScanner);
501     quota.addMutation(append);
502     Result r = null;
503     if (region.getCoprocessorHost() != null) {
504       r = region.getCoprocessorHost().preAppend(append);
505     }
506     if (r == null) {
507       long nonce = startNonceOperation(m, nonceGroup);
508       boolean success = false;
509       try {
510         r = region.append(append, nonceGroup, nonce);
511         success = true;
512       } finally {
513         endNonceOperation(m, nonceGroup, success);
514       }
515       if (region.getCoprocessorHost() != null) {
516         region.getCoprocessorHost().postAppend(append, r);
517       }
518     }
519     if (regionServer.metricsRegionServer != null) {
520       regionServer.metricsRegionServer.updateAppend(
521         EnvironmentEdgeManager.currentTime() - before);
522     }
523     return r;
524   }
525 
526   /**
527    * Execute an increment mutation.
528    *
529    * @param region
530    * @param mutation
531    * @return the Result
532    * @throws IOException
533    */
increment(final Region region, final OperationQuota quota, final MutationProto mutation, final CellScanner cells, long nonceGroup)534   private Result increment(final Region region, final OperationQuota quota,
535       final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
536     long before = EnvironmentEdgeManager.currentTime();
537     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
538     quota.addMutation(increment);
539     Result r = null;
540     if (region.getCoprocessorHost() != null) {
541       r = region.getCoprocessorHost().preIncrement(increment);
542     }
543     if (r == null) {
544       long nonce = startNonceOperation(mutation, nonceGroup);
545       boolean success = false;
546       try {
547         r = region.increment(increment, nonceGroup, nonce);
548         success = true;
549       } finally {
550         endNonceOperation(mutation, nonceGroup, success);
551       }
552       if (region.getCoprocessorHost() != null) {
553         r = region.getCoprocessorHost().postIncrement(increment, r);
554       }
555     }
556     if (regionServer.metricsRegionServer != null) {
557       regionServer.metricsRegionServer.updateIncrement(
558         EnvironmentEdgeManager.currentTime() - before);
559     }
560     return r;
561   }
562 
563   /**
564    * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
565    * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
566    * @param region
567    * @param actions
568    * @param cellScanner
569    * @param builder
570    * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
571    * method returns as a 'result'.
572    * @return Return the <code>cellScanner</code> passed
573    */
doNonAtomicRegionMutation(final Region region, final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup)574   private List<CellScannable> doNonAtomicRegionMutation(final Region region,
575       final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
576       final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
577     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
578     // one at a time, we instead pass them in batch.  Be aware that the corresponding
579     // ResultOrException instance that matches each Put or Delete is then added down in the
580     // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
581     List<ClientProtos.Action> mutations = null;
582     long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
583     RpcCallContext context = RpcServer.getCurrentCall();
584     IOException sizeIOE = null;
585     Object lastBlock = null;
586     for (ClientProtos.Action action : actions.getActionList()) {
587       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
588       try {
589         Result r = null;
590 
591         if (context != null
592             && context.isRetryImmediatelySupported()
593             && (context.getResponseCellSize() > maxQuotaResultSize
594               || context.getResponseBlockSize() > maxQuotaResultSize)) {
595 
596           // We're storing the exception since the exception and reason string won't
597           // change after the response size limit is reached.
598           if (sizeIOE == null ) {
599             // We don't need the stack un-winding do don't throw the exception.
600             // Throwing will kill the JVM's JIT.
601             //
602             // Instead just create the exception and then store it.
603             sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
604                 + " CellSize: " + context.getResponseCellSize()
605                 + " BlockSize: " + context.getResponseBlockSize());
606 
607             // Only report the exception once since there's only one request that
608             // caused the exception. Otherwise this number will dominate the exceptions count.
609             rpcServer.getMetrics().exception(sizeIOE);
610           }
611 
612           // Now that there's an exception is known to be created
613           // use it for the response.
614           //
615           // This will create a copy in the builder.
616           resultOrExceptionBuilder = ResultOrException.newBuilder().
617               setException(ResponseConverter.buildException(sizeIOE));
618           resultOrExceptionBuilder.setIndex(action.getIndex());
619           builder.addResultOrException(resultOrExceptionBuilder.build());
620           if (cellScanner != null) {
621             skipCellsForMutation(action, cellScanner);
622           }
623           continue;
624         }
625         if (action.hasGet()) {
626           long before = EnvironmentEdgeManager.currentTime();
627           try {
628             Get get = ProtobufUtil.toGet(action.getGet());
629             r = region.get(get);
630           } finally {
631             if (regionServer.metricsRegionServer != null) {
632               regionServer.metricsRegionServer.updateGet(
633                 EnvironmentEdgeManager.currentTime() - before);
634             }
635           }
636         } else if (action.hasServiceCall()) {
637           resultOrExceptionBuilder = ResultOrException.newBuilder();
638           try {
639             Message result = execServiceOnRegion(region, action.getServiceCall());
640             ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
641                 ClientProtos.CoprocessorServiceResult.newBuilder();
642             resultOrExceptionBuilder.setServiceResult(
643                 serviceResultBuilder.setValue(
644                   serviceResultBuilder.getValueBuilder()
645                     .setName(result.getClass().getName())
646                     .setValue(result.toByteString())));
647           } catch (IOException ioe) {
648             rpcServer.getMetrics().exception(ioe);
649             resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
650           }
651         } else if (action.hasMutation()) {
652           MutationType type = action.getMutation().getMutateType();
653           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
654               !mutations.isEmpty()) {
655             // Flush out any Puts or Deletes already collected.
656             doBatchOp(builder, region, quota, mutations, cellScanner);
657             mutations.clear();
658           }
659           switch (type) {
660           case APPEND:
661             r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
662             break;
663           case INCREMENT:
664             r = increment(region, quota, action.getMutation(), cellScanner,  nonceGroup);
665             break;
666           case PUT:
667           case DELETE:
668             // Collect the individual mutations and apply in a batch
669             if (mutations == null) {
670               mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
671             }
672             mutations.add(action);
673             break;
674           default:
675             throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
676           }
677         } else {
678           throw new HBaseIOException("Unexpected Action type");
679         }
680         if (r != null) {
681           ClientProtos.Result pbResult = null;
682           if (isClientCellBlockSupport()) {
683             pbResult = ProtobufUtil.toResultNoData(r);
684             //  Hard to guess the size here.  Just make a rough guess.
685             if (cellsToReturn == null) {
686               cellsToReturn = new ArrayList<CellScannable>();
687             }
688             cellsToReturn.add(r);
689           } else {
690             pbResult = ProtobufUtil.toResult(r);
691           }
692           lastBlock = addSize(context, r, lastBlock);
693           resultOrExceptionBuilder =
694             ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
695         }
696         // Could get to here and there was no result and no exception.  Presumes we added
697         // a Put or Delete to the collecting Mutations List for adding later.  In this
698         // case the corresponding ResultOrException instance for the Put or Delete will be added
699         // down in the doBatchOp method call rather than up here.
700       } catch (IOException ie) {
701         rpcServer.getMetrics().exception(ie);
702         resultOrExceptionBuilder = ResultOrException.newBuilder().
703           setException(ResponseConverter.buildException(ie));
704       }
705       if (resultOrExceptionBuilder != null) {
706         // Propagate index.
707         resultOrExceptionBuilder.setIndex(action.getIndex());
708         builder.addResultOrException(resultOrExceptionBuilder.build());
709       }
710     }
711     // Finish up any outstanding mutations
712     if (mutations != null && !mutations.isEmpty()) {
713       doBatchOp(builder, region, quota, mutations, cellScanner);
714     }
715     return cellsToReturn;
716   }
717 
718   /**
719    * Execute a list of Put/Delete mutations.
720    *
721    * @param builder
722    * @param region
723    * @param mutations
724    */
doBatchOp(final RegionActionResult.Builder builder, final Region region, final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells)725   private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
726       final OperationQuota quota,
727       final List<ClientProtos.Action> mutations, final CellScanner cells) {
728     Mutation[] mArray = new Mutation[mutations.size()];
729     long before = EnvironmentEdgeManager.currentTime();
730     boolean batchContainsPuts = false, batchContainsDelete = false;
731     try {
732       int i = 0;
733       for (ClientProtos.Action action: mutations) {
734         MutationProto m = action.getMutation();
735         Mutation mutation;
736         if (m.getMutateType() == MutationType.PUT) {
737           mutation = ProtobufUtil.toPut(m, cells);
738           batchContainsPuts = true;
739         } else {
740           mutation = ProtobufUtil.toDelete(m, cells);
741           batchContainsDelete = true;
742         }
743         mArray[i++] = mutation;
744         quota.addMutation(mutation);
745       }
746 
747       if (!region.getRegionInfo().isMetaTable()) {
748         regionServer.cacheFlusher.reclaimMemStoreMemory();
749       }
750 
751       OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
752         HConstants.NO_NONCE);
753       for (i = 0; i < codes.length; i++) {
754         int index = mutations.get(i).getIndex();
755         Exception e = null;
756         switch (codes[i].getOperationStatusCode()) {
757           case BAD_FAMILY:
758             e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
759             builder.addResultOrException(getResultOrException(e, index));
760             break;
761 
762           case SANITY_CHECK_FAILURE:
763             e = new FailedSanityCheckException(codes[i].getExceptionMsg());
764             builder.addResultOrException(getResultOrException(e, index));
765             break;
766 
767           default:
768             e = new DoNotRetryIOException(codes[i].getExceptionMsg());
769             builder.addResultOrException(getResultOrException(e, index));
770             break;
771 
772           case SUCCESS:
773             builder.addResultOrException(getResultOrException(
774                 ClientProtos.Result.getDefaultInstance(), index,
775                 ((HRegion) region).getRegionStats()));
776             break;
777         }
778       }
779     } catch (IOException ie) {
780       for (int i = 0; i < mutations.size(); i++) {
781         builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
782       }
783     }
784     if (regionServer.metricsRegionServer != null) {
785       long after = EnvironmentEdgeManager.currentTime();
786       if (batchContainsPuts) {
787         regionServer.metricsRegionServer.updatePut(after - before);
788       }
789       if (batchContainsDelete) {
790         regionServer.metricsRegionServer.updateDelete(after - before);
791       }
792     }
793   }
794 
795   /**
796    * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
797    * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
798    * @param region
799    * @param mutations
800    * @param replaySeqId
801    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
802    *         exceptionMessage if any
803    * @throws IOException
804    */
doReplayBatchOp(final Region region, final List<WALSplitter.MutationReplay> mutations, long replaySeqId)805   private OperationStatus [] doReplayBatchOp(final Region region,
806       final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
807     long before = EnvironmentEdgeManager.currentTime();
808     boolean batchContainsPuts = false, batchContainsDelete = false;
809     try {
810       for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
811         WALSplitter.MutationReplay m = it.next();
812 
813         if (m.type == MutationType.PUT) {
814           batchContainsPuts = true;
815         } else {
816           batchContainsDelete = true;
817         }
818 
819         NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
820         List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
821         if (metaCells != null && !metaCells.isEmpty()) {
822           for (Cell metaCell : metaCells) {
823             CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
824             boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
825             HRegion hRegion = (HRegion)region;
826             if (compactionDesc != null) {
827               // replay the compaction. Remove the files from stores only if we are the primary
828               // region replica (thus own the files)
829               hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
830                 replaySeqId);
831               continue;
832             }
833             FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
834             if (flushDesc != null && !isDefaultReplica) {
835               hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
836               continue;
837             }
838             RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
839             if (regionEvent != null && !isDefaultReplica) {
840               hRegion.replayWALRegionEventMarker(regionEvent);
841               continue;
842             }
843             BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
844             if (bulkLoadEvent != null) {
845               hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
846               continue;
847             }
848           }
849           it.remove();
850         }
851       }
852       requestCount.add(mutations.size());
853       if (!region.getRegionInfo().isMetaTable()) {
854         regionServer.cacheFlusher.reclaimMemStoreMemory();
855       }
856       return region.batchReplay(mutations.toArray(
857         new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
858     } finally {
859       if (regionServer.metricsRegionServer != null) {
860         long after = EnvironmentEdgeManager.currentTime();
861           if (batchContainsPuts) {
862           regionServer.metricsRegionServer.updatePut(after - before);
863         }
864         if (batchContainsDelete) {
865           regionServer.metricsRegionServer.updateDelete(after - before);
866         }
867       }
868     }
869   }
870 
closeAllScanners()871   private void closeAllScanners() {
872     // Close any outstanding scanners. Means they'll get an UnknownScanner
873     // exception next time they come in.
874     for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
875       try {
876         e.getValue().s.close();
877       } catch (IOException ioe) {
878         LOG.warn("Closing scanner " + e.getKey(), ioe);
879       }
880     }
881   }
882 
RSRpcServices(HRegionServer rs)883   public RSRpcServices(HRegionServer rs) throws IOException {
884     regionServer = rs;
885 
886     RpcSchedulerFactory rpcSchedulerFactory;
887     try {
888       Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
889           REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
890           SimpleRpcSchedulerFactory.class);
891       rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
892     } catch (InstantiationException e) {
893       throw new IllegalArgumentException(e);
894     } catch (IllegalAccessException e) {
895       throw new IllegalArgumentException(e);
896     }
897     // Server to handle client requests.
898     InetSocketAddress initialIsa;
899     InetSocketAddress bindAddress;
900     if(this instanceof MasterRpcServices) {
901       String hostname = getHostname(rs.conf, true);
902       int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
903       // Creation of a HSA will force a resolve.
904       initialIsa = new InetSocketAddress(hostname, port);
905       bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
906     } else {
907       String hostname = getHostname(rs.conf, false);
908       int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
909         HConstants.DEFAULT_REGIONSERVER_PORT);
910       // Creation of a HSA will force a resolve.
911       initialIsa = new InetSocketAddress(hostname, port);
912       bindAddress = new InetSocketAddress(
913         rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
914     }
915     if (initialIsa.getAddress() == null) {
916       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
917     }
918     priority = createPriority();
919     String name = rs.getProcessName() + "/" + initialIsa.toString();
920     // Set how many times to retry talking to another server over HConnection.
921     ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
922     try {
923       rpcServer = new RpcServer(rs, name, getServices(),
924           bindAddress, // use final bindAddress for this server.
925           rs.conf,
926           rpcSchedulerFactory.create(rs.conf, this, rs));
927     } catch (BindException be) {
928       String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
929           HConstants.REGIONSERVER_PORT;
930       throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
931           "' configuration property.", be.getCause() != null ? be.getCause() : be);
932     }
933 
934     scannerLeaseTimeoutPeriod = rs.conf.getInt(
935       HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
936       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
937     maxScannerResultSize = rs.conf.getLong(
938       HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
939       HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
940     rpcTimeout = rs.conf.getInt(
941       HConstants.HBASE_RPC_TIMEOUT_KEY,
942       HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
943     minimumScanTimeLimitDelta = rs.conf.getLong(
944       REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
945       DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
946 
947     InetSocketAddress address = rpcServer.getListenerAddress();
948     if (address == null) {
949       throw new IOException("Listener channel is closed");
950     }
951     // Set our address, however we need the final port that was given to rpcServer
952     isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
953     rpcServer.setErrorHandler(this);
954     rs.setName(name);
955   }
956 
957   @Override
onConfigurationChange(Configuration newConf)958   public void onConfigurationChange(Configuration newConf) {
959     if (rpcServer instanceof ConfigurationObserver) {
960       ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
961     }
962   }
963 
createPriority()964   protected PriorityFunction createPriority() {
965     return new AnnotationReadingPriorityFunction(this);
966   }
967 
getHostname(Configuration conf, boolean isMaster)968   public static String getHostname(Configuration conf, boolean isMaster)
969       throws UnknownHostException {
970     String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
971       HRegionServer.RS_HOSTNAME_KEY);
972     if (hostname == null || hostname.isEmpty()) {
973       String masterOrRS = isMaster ? "master" : "regionserver";
974       return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
975         conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
976         conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
977     } else {
978       LOG.info("hostname is configured to be " + hostname);
979       return hostname;
980     }
981   }
982 
getScanner(long scannerId)983   RegionScanner getScanner(long scannerId) {
984     String scannerIdString = Long.toString(scannerId);
985     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
986     if (scannerHolder != null) {
987       return scannerHolder.s;
988     }
989     return null;
990   }
991 
992   /**
993    * Get the vtime associated with the scanner.
994    * Currently the vtime is the number of "next" calls.
995    */
getScannerVirtualTime(long scannerId)996   long getScannerVirtualTime(long scannerId) {
997     String scannerIdString = Long.toString(scannerId);
998     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
999     if (scannerHolder != null) {
1000       return scannerHolder.getNextCallSeq();
1001     }
1002     return 0L;
1003   }
1004 
addScanner(RegionScanner s, Region r)1005   long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
1006     long scannerId = this.scannerIdGen.incrementAndGet();
1007     String scannerName = String.valueOf(scannerId);
1008 
1009     RegionScannerHolder existing =
1010       scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
1011     assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
1012 
1013     regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1014         new ScannerListener(scannerName));
1015     return scannerId;
1016   }
1017 
1018   /**
1019    * Method to account for the size of retained cells and retained data blocks.
1020    * @return an object that represents the last referenced block from this response.
1021    */
addSize(RpcCallContext context, Result r, Object lastBlock)1022   Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1023     if (context != null && !r.isEmpty()) {
1024       for (Cell c : r.rawCells()) {
1025         context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
1026         // We're using the last block being the same as the current block as
1027         // a proxy for pointing to a new block. This won't be exact.
1028         // If there are multiple gets that bounce back and forth
1029         // Then it's possible that this will over count the size of
1030         // referenced blocks. However it's better to over count and
1031         // use two RPC's than to OOME the RegionServer.
1032         byte[] valueArray = c.getValueArray();
1033         if (valueArray != lastBlock) {
1034           context.incrementResponseBlockSize(valueArray.length);
1035           lastBlock = valueArray;
1036         }
1037       }
1038     }
1039     return lastBlock;
1040   }
1041 
1042 
1043   /**
1044    * Find the HRegion based on a region specifier
1045    *
1046    * @param regionSpecifier the region specifier
1047    * @return the corresponding region
1048    * @throws IOException if the specifier is not null,
1049    *    but failed to find the region
1050    */
getRegion( final RegionSpecifier regionSpecifier)1051   Region getRegion(
1052       final RegionSpecifier regionSpecifier) throws IOException {
1053     return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
1054         ProtobufUtil.getRegionEncodedName(regionSpecifier));
1055   }
1056 
1057   @VisibleForTesting
getPriority()1058   public PriorityFunction getPriority() {
1059     return priority;
1060   }
1061 
1062   @VisibleForTesting
getConfiguration()1063   public Configuration getConfiguration() {
1064     return regionServer.getConfiguration();
1065   }
1066 
getQuotaManager()1067   private RegionServerQuotaManager getQuotaManager() {
1068     return regionServer.getRegionServerQuotaManager();
1069   }
1070 
start()1071   void start() {
1072     rpcServer.start();
1073   }
1074 
stop()1075   void stop() {
1076     closeAllScanners();
1077     rpcServer.stop();
1078   }
1079 
1080   /**
1081    * Called to verify that this server is up and running.
1082    *
1083    * @throws IOException
1084    */
checkOpen()1085   protected void checkOpen() throws IOException {
1086     if (regionServer.isAborted()) {
1087       throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1088     }
1089     if (regionServer.isStopped()) {
1090       throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1091     }
1092     if (!regionServer.fsOk) {
1093       throw new RegionServerStoppedException("File system not available");
1094     }
1095     if (!regionServer.isOnline()) {
1096       throw new ServerNotRunningYetException("Server is not running yet");
1097     }
1098   }
1099 
1100   /**
1101    * @return list of blocking services and their security info classes that this server supports
1102    */
getServices()1103   protected List<BlockingServiceAndInterface> getServices() {
1104     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1105     bssi.add(new BlockingServiceAndInterface(
1106       ClientService.newReflectiveBlockingService(this),
1107       ClientService.BlockingInterface.class));
1108     bssi.add(new BlockingServiceAndInterface(
1109       AdminService.newReflectiveBlockingService(this),
1110       AdminService.BlockingInterface.class));
1111     return bssi;
1112   }
1113 
getSocketAddress()1114   public InetSocketAddress getSocketAddress() {
1115     return isa;
1116   }
1117 
1118   @Override
getPriority(RequestHeader header, Message param, User user)1119   public int getPriority(RequestHeader header, Message param, User user) {
1120     return priority.getPriority(header, param, user);
1121   }
1122 
1123   @Override
getDeadline(RequestHeader header, Message param)1124   public long getDeadline(RequestHeader header, Message param) {
1125     return priority.getDeadline(header, param);
1126   }
1127 
1128   /*
1129    * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1130    *
1131    * @param e
1132    *
1133    * @return True if we OOME'd and are aborting.
1134    */
1135   @Override
checkOOME(final Throwable e)1136   public boolean checkOOME(final Throwable e) {
1137     boolean stop = false;
1138     try {
1139       if (e instanceof OutOfMemoryError
1140           || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1141           || (e.getMessage() != null && e.getMessage().contains(
1142               "java.lang.OutOfMemoryError"))) {
1143         stop = true;
1144         LOG.fatal("Run out of memory; " + getClass().getSimpleName()
1145           + " will abort itself immediately", e);
1146       }
1147     } finally {
1148       if (stop) {
1149         Runtime.getRuntime().halt(1);
1150       }
1151     }
1152     return stop;
1153   }
1154 
1155   /**
1156    * Close a region on the region server.
1157    *
1158    * @param controller the RPC controller
1159    * @param request the request
1160    * @throws ServiceException
1161    */
1162   @Override
1163   @QosPriority(priority=HConstants.ADMIN_QOS)
closeRegion(final RpcController controller, final CloseRegionRequest request)1164   public CloseRegionResponse closeRegion(final RpcController controller,
1165       final CloseRegionRequest request) throws ServiceException {
1166     final ServerName sn = (request.hasDestinationServer() ?
1167       ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1168 
1169     try {
1170       checkOpen();
1171       if (request.hasServerStartCode()) {
1172         // check that we are the same server that this RPC is intended for.
1173         long serverStartCode = request.getServerStartCode();
1174         if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1175           throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1176               "different server with startCode: " + serverStartCode + ", this server is: "
1177               + regionServer.serverName));
1178         }
1179       }
1180       final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1181 
1182       // Can be null if we're calling close on a region that's not online
1183       final Region region = regionServer.getFromOnlineRegions(encodedRegionName);
1184       if ((region  != null) && (region .getCoprocessorHost() != null)) {
1185         region.getCoprocessorHost().preClose(false);
1186       }
1187 
1188       requestCount.increment();
1189       LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1190       CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1191         .getCloseRegionCoordination().parseFromProtoRequest(request);
1192 
1193       boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1194       CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1195       return builder.build();
1196     } catch (IOException ie) {
1197       throw new ServiceException(ie);
1198     }
1199   }
1200 
1201   /**
1202    * Compact a region on the region server.
1203    *
1204    * @param controller the RPC controller
1205    * @param request the request
1206    * @throws ServiceException
1207    */
1208   @Override
1209   @QosPriority(priority=HConstants.ADMIN_QOS)
compactRegion(final RpcController controller, final CompactRegionRequest request)1210   public CompactRegionResponse compactRegion(final RpcController controller,
1211       final CompactRegionRequest request) throws ServiceException {
1212     try {
1213       checkOpen();
1214       requestCount.increment();
1215       Region region = getRegion(request.getRegion());
1216       region.startRegionOperation(Operation.COMPACT_REGION);
1217       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1218       boolean major = false;
1219       byte [] family = null;
1220       Store store = null;
1221       if (request.hasFamily()) {
1222         family = request.getFamily().toByteArray();
1223         store = region.getStore(family);
1224         if (store == null) {
1225           throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1226             + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1227         }
1228       }
1229       if (request.hasMajor()) {
1230         major = request.getMajor();
1231       }
1232       if (major) {
1233         if (family != null) {
1234           store.triggerMajorCompaction();
1235         } else {
1236           region.triggerMajorCompaction();
1237         }
1238       }
1239 
1240       String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1241       if (LOG.isTraceEnabled()) {
1242         LOG.trace("User-triggered compaction requested for region "
1243           + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1244       }
1245       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1246       if(family != null) {
1247         regionServer.compactSplitThread.requestCompaction(region, store, log,
1248           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1249       } else {
1250         regionServer.compactSplitThread.requestCompaction(region, log,
1251           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1252       }
1253       return CompactRegionResponse.newBuilder().build();
1254     } catch (IOException ie) {
1255       throw new ServiceException(ie);
1256     }
1257   }
1258 
1259   /**
1260    * Flush a region on the region server.
1261    *
1262    * @param controller the RPC controller
1263    * @param request the request
1264    * @throws ServiceException
1265    */
1266   @Override
1267   @QosPriority(priority=HConstants.ADMIN_QOS)
flushRegion(final RpcController controller, final FlushRegionRequest request)1268   public FlushRegionResponse flushRegion(final RpcController controller,
1269       final FlushRegionRequest request) throws ServiceException {
1270     try {
1271       checkOpen();
1272       requestCount.increment();
1273       Region region = getRegion(request.getRegion());
1274       LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1275       boolean shouldFlush = true;
1276       if (request.hasIfOlderThanTs()) {
1277         shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1278       }
1279       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1280       if (shouldFlush) {
1281         boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1282             request.getWriteFlushWalMarker() : false;
1283         long startTime = EnvironmentEdgeManager.currentTime();
1284         // Go behind the curtain so we can manage writing of the flush WAL marker
1285         HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1286             ((HRegion)region).flushcache(true, writeFlushWalMarker);
1287         if (flushResult.isFlushSucceeded()) {
1288           long endTime = EnvironmentEdgeManager.currentTime();
1289           regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1290         }
1291         boolean compactionNeeded = flushResult.isCompactionNeeded();
1292         if (compactionNeeded) {
1293           regionServer.compactSplitThread.requestSystemCompaction(region,
1294             "Compaction through user triggered flush");
1295         }
1296         builder.setFlushed(flushResult.isFlushSucceeded());
1297         builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1298       }
1299       builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1300       return builder.build();
1301     } catch (DroppedSnapshotException ex) {
1302       // Cache flush can fail in a few places. If it fails in a critical
1303       // section, we get a DroppedSnapshotException and a replay of wal
1304       // is required. Currently the only way to do this is a restart of
1305       // the server.
1306       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1307       throw new ServiceException(ex);
1308     } catch (IOException ie) {
1309       throw new ServiceException(ie);
1310     }
1311   }
1312 
1313   @Override
1314   @QosPriority(priority=HConstants.ADMIN_QOS)
1315   public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1316       final GetOnlineRegionRequest request) throws ServiceException {
1317     try {
1318       checkOpen();
1319       requestCount.increment();
1320       Map<String, Region> onlineRegions = regionServer.onlineRegions;
1321       List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1322       for (Region region: onlineRegions.values()) {
1323         list.add(region.getRegionInfo());
1324       }
1325       Collections.sort(list);
1326       return ResponseConverter.buildGetOnlineRegionResponse(list);
1327     } catch (IOException ie) {
1328       throw new ServiceException(ie);
1329     }
1330   }
1331 
1332   @Override
1333   @QosPriority(priority=HConstants.ADMIN_QOS)
1334   public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1335       final GetRegionInfoRequest request) throws ServiceException {
1336     try {
1337       checkOpen();
1338       requestCount.increment();
1339       Region region = getRegion(request.getRegion());
1340       HRegionInfo info = region.getRegionInfo();
1341       GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1342       builder.setRegionInfo(HRegionInfo.convert(info));
1343       if (request.hasCompactionState() && request.getCompactionState()) {
1344         builder.setCompactionState(region.getCompactionState());
1345       }
1346       builder.setIsRecovering(region.isRecovering());
1347       return builder.build();
1348     } catch (IOException ie) {
1349       throw new ServiceException(ie);
1350     }
1351   }
1352 
1353   /**
1354    * Get some information of the region server.
1355    *
1356    * @param controller the RPC controller
1357    * @param request the request
1358    * @throws ServiceException
1359    */
1360   @Override
1361   @QosPriority(priority=HConstants.ADMIN_QOS)
1362   public GetServerInfoResponse getServerInfo(final RpcController controller,
1363       final GetServerInfoRequest request) throws ServiceException {
1364     try {
1365       checkOpen();
1366     } catch (IOException ie) {
1367       throw new ServiceException(ie);
1368     }
1369     requestCount.increment();
1370     int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1371     return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1372   }
1373 
1374   @Override
1375   @QosPriority(priority=HConstants.ADMIN_QOS)
1376   public GetStoreFileResponse getStoreFile(final RpcController controller,
1377       final GetStoreFileRequest request) throws ServiceException {
1378     try {
1379       checkOpen();
1380       Region region = getRegion(request.getRegion());
1381       requestCount.increment();
1382       Set<byte[]> columnFamilies;
1383       if (request.getFamilyCount() == 0) {
1384         columnFamilies = region.getTableDesc().getFamiliesKeys();
1385       } else {
1386         columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1387         for (ByteString cf: request.getFamilyList()) {
1388           columnFamilies.add(cf.toByteArray());
1389         }
1390       }
1391       int nCF = columnFamilies.size();
1392       List<String>  fileList = region.getStoreFileList(
1393         columnFamilies.toArray(new byte[nCF][]));
1394       GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1395       builder.addAllStoreFile(fileList);
1396       return builder.build();
1397     } catch (IOException ie) {
1398       throw new ServiceException(ie);
1399     }
1400   }
1401 
1402   /**
1403    * Merge regions on the region server.
1404    *
1405    * @param controller the RPC controller
1406    * @param request the request
1407    * @return merge regions response
1408    * @throws ServiceException
1409    */
1410   @Override
1411   @QosPriority(priority = HConstants.ADMIN_QOS)
1412   public MergeRegionsResponse mergeRegions(final RpcController controller,
1413       final MergeRegionsRequest request) throws ServiceException {
1414     try {
1415       checkOpen();
1416       requestCount.increment();
1417       Region regionA = getRegion(request.getRegionA());
1418       Region regionB = getRegion(request.getRegionB());
1419       boolean forcible = request.getForcible();
1420       long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1421       regionA.startRegionOperation(Operation.MERGE_REGION);
1422       regionB.startRegionOperation(Operation.MERGE_REGION);
1423       if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1424           regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1425         throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1426       }
1427       LOG.info("Receiving merging request for  " + regionA + ", " + regionB
1428           + ",forcible=" + forcible);
1429       long startTime = EnvironmentEdgeManager.currentTime();
1430       FlushResult flushResult = regionA.flush(true);
1431       if (flushResult.isFlushSucceeded()) {
1432         long endTime = EnvironmentEdgeManager.currentTime();
1433         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1434       }
1435       startTime = EnvironmentEdgeManager.currentTime();
1436       flushResult = regionB.flush(true);
1437       if (flushResult.isFlushSucceeded()) {
1438         long endTime = EnvironmentEdgeManager.currentTime();
1439         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1440       }
1441       regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1442           masterSystemTime, RpcServer.getRequestUser());
1443       return MergeRegionsResponse.newBuilder().build();
1444     } catch (DroppedSnapshotException ex) {
1445       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1446       throw new ServiceException(ex);
1447     } catch (IOException ie) {
1448       throw new ServiceException(ie);
1449     }
1450   }
1451 
1452   /**
1453    * Open asynchronously a region or a set of regions on the region server.
1454    *
1455    * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1456    *  before being called. As a consequence, this method should be called only from the master.
1457    * <p>
1458    * Different manages states for the region are:
1459    * </p><ul>
1460    *  <li>region not opened: the region opening will start asynchronously.</li>
1461    *  <li>a close is already in progress: this is considered as an error.</li>
1462    *  <li>an open is already in progress: this new open request will be ignored. This is important
1463    *  because the Master can do multiple requests if it crashes.</li>
1464    *  <li>the region is already opened:  this new open request will be ignored.</li>
1465    *  </ul>
1466    * <p>
1467    * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1468    * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1469    * errors are put in the response as FAILED_OPENING.
1470    * </p>
1471    * @param controller the RPC controller
1472    * @param request the request
1473    * @throws ServiceException
1474    */
1475   @Override
1476   @QosPriority(priority=HConstants.ADMIN_QOS)
1477   public OpenRegionResponse openRegion(final RpcController controller,
1478       final OpenRegionRequest request) throws ServiceException {
1479     requestCount.increment();
1480     if (request.hasServerStartCode()) {
1481       // check that we are the same server that this RPC is intended for.
1482       long serverStartCode = request.getServerStartCode();
1483       if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1484         throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1485             "different server with startCode: " + serverStartCode + ", this server is: "
1486             + regionServer.serverName));
1487       }
1488     }
1489 
1490     OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1491     final int regionCount = request.getOpenInfoCount();
1492     final Map<TableName, HTableDescriptor> htds =
1493         new HashMap<TableName, HTableDescriptor>(regionCount);
1494     final boolean isBulkAssign = regionCount > 1;
1495     try {
1496       checkOpen();
1497     } catch (IOException ie) {
1498       TableName tableName = null;
1499       if (regionCount == 1) {
1500         RegionInfo ri = request.getOpenInfo(0).getRegion();
1501         if (ri != null) {
1502           tableName = ProtobufUtil.toTableName(ri.getTableName());
1503         }
1504       }
1505       if (!TableName.META_TABLE_NAME.equals(tableName)) {
1506         throw new ServiceException(ie);
1507       }
1508       // We are assigning meta, wait a little for regionserver to finish initialization.
1509       int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1510         HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
1511       long endTime = System.currentTimeMillis() + timeout;
1512       synchronized (regionServer.online) {
1513         try {
1514           while (System.currentTimeMillis() <= endTime
1515               && !regionServer.isStopped() && !regionServer.isOnline()) {
1516             regionServer.online.wait(regionServer.msgInterval);
1517           }
1518           checkOpen();
1519         } catch (InterruptedException t) {
1520           Thread.currentThread().interrupt();
1521           throw new ServiceException(t);
1522         } catch (IOException e) {
1523           throw new ServiceException(e);
1524         }
1525       }
1526     }
1527 
1528     long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1529 
1530     for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1531       final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1532       OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1533         getOpenRegionCoordination();
1534       OpenRegionCoordination.OpenRegionDetails ord =
1535         coordination.parseFromProtoRequest(regionOpenInfo);
1536 
1537       HTableDescriptor htd;
1538       try {
1539         final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1540         if (onlineRegion != null) {
1541           //Check if the region can actually be opened.
1542           if (onlineRegion.getCoprocessorHost() != null) {
1543             onlineRegion.getCoprocessorHost().preOpen();
1544           }
1545           // See HBASE-5094. Cross check with hbase:meta if still this RS is owning
1546           // the region.
1547           Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1548             regionServer.getConnection(), region.getRegionName());
1549           if (regionServer.serverName.equals(p.getSecond())) {
1550             Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1551             // Map regionsInTransitionInRSOnly has an entry for a region only if the region
1552             // is in transition on this RS, so here closing can be null. If not null, it can
1553             // be true or false. True means the region is opening on this RS; while false
1554             // means the region is closing. Only return ALREADY_OPENED if not closing (i.e.
1555             // not in transition any more, or still transition to open.
1556             if (!Boolean.FALSE.equals(closing)
1557                 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1558               LOG.warn("Attempted open of " + region.getEncodedName()
1559                 + " but already online on this server");
1560               builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1561               continue;
1562             }
1563           } else {
1564             LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1565               + " but hbase:meta does not have this server - continue opening.");
1566             regionServer.removeFromOnlineRegions(onlineRegion, null);
1567           }
1568         }
1569         LOG.info("Open " + region.getRegionNameAsString());
1570         htd = htds.get(region.getTable());
1571         if (htd == null) {
1572           htd = regionServer.tableDescriptors.get(region.getTable());
1573           htds.put(region.getTable(), htd);
1574         }
1575 
1576         final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1577           region.getEncodedNameAsBytes(), Boolean.TRUE);
1578 
1579         if (Boolean.FALSE.equals(previous)) {
1580           // There is a close in progress. We need to mark this open as failed in ZK.
1581 
1582           coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1583 
1584           throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1585             + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1586         }
1587 
1588         if (Boolean.TRUE.equals(previous)) {
1589           // An open is in progress. This is supported, but let's log this.
1590           LOG.info("Receiving OPEN for the region:" +
1591             region.getRegionNameAsString() + " , which we are already trying to OPEN"
1592               + " - ignoring this new request for this region.");
1593         }
1594 
1595         // We are opening this region. If it moves back and forth for whatever reason, we don't
1596         // want to keep returning the stale moved record while we are opening/if we close again.
1597         regionServer.removeFromMovedRegions(region.getEncodedName());
1598 
1599         if (previous == null) {
1600           // check if the region to be opened is marked in recovering state in ZK
1601           if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1602               region.getEncodedName())) {
1603             // Check if current region open is for distributedLogReplay. This check is to support
1604             // rolling restart/upgrade where we want to Master/RS see same configuration
1605             if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1606                   || regionOpenInfo.getOpenForDistributedLogReplay()) {
1607               regionServer.recoveringRegions.put(region.getEncodedName(), null);
1608             } else {
1609               // Remove stale recovery region from ZK when we open region not for recovering which
1610               // could happen when turn distributedLogReplay off from on.
1611               List<String> tmpRegions = new ArrayList<String>();
1612               tmpRegions.add(region.getEncodedName());
1613               ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1614                 tmpRegions);
1615             }
1616           }
1617           // If there is no action in progress, we can submit a specific handler.
1618           // Need to pass the expected version in the constructor.
1619           if (region.isMetaRegion()) {
1620             regionServer.service.submit(new OpenMetaHandler(
1621               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1622           } else {
1623             regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1624               regionOpenInfo.getFavoredNodesList());
1625             regionServer.service.submit(new OpenRegionHandler(
1626               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1627           }
1628         }
1629 
1630         builder.addOpeningState(RegionOpeningState.OPENED);
1631 
1632       } catch (KeeperException zooKeeperEx) {
1633         LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1634         throw new ServiceException(zooKeeperEx);
1635       } catch (IOException ie) {
1636         LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1637         if (isBulkAssign) {
1638           builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1639         } else {
1640           throw new ServiceException(ie);
1641         }
1642       }
1643     }
1644     return builder.build();
1645   }
1646 
1647   /**
1648    *  Wamrmup a region on this server.
1649    *
1650    * This method should only be called by Master. It synchrnously opens the region and
1651    * closes the region bringing the most important pages in cache.
1652    * <p>
1653    *
1654    * @param controller the RPC controller
1655    * @param request the request
1656    * @throws ServiceException
1657    */
1658   @Override
warmupRegion(final RpcController controller, final WarmupRegionRequest request)1659   public WarmupRegionResponse warmupRegion(final RpcController controller,
1660       final WarmupRegionRequest request) throws ServiceException {
1661 
1662     RegionInfo regionInfo = request.getRegionInfo();
1663     final HRegionInfo region = HRegionInfo.convert(regionInfo);
1664     HTableDescriptor htd;
1665     WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1666 
1667     try {
1668       checkOpen();
1669       String encodedName = region.getEncodedName();
1670       byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1671       final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1672 
1673       if (onlineRegion != null) {
1674         LOG.info("Region already online. Skipping warming up " + region);
1675         return response;
1676       }
1677 
1678       if (LOG.isDebugEnabled()) {
1679         LOG.debug("Warming up Region " + region.getRegionNameAsString());
1680       }
1681 
1682       htd = regionServer.tableDescriptors.get(region.getTable());
1683 
1684       if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1685         LOG.info("Region is in transition. Skipping warmup " + region);
1686         return response;
1687       }
1688 
1689       HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1690           regionServer.getConfiguration(), regionServer, null);
1691 
1692     } catch (IOException ie) {
1693       LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1694       throw new ServiceException(ie);
1695     }
1696 
1697     return response;
1698   }
1699 
1700   /**
1701    * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
1702    * that the given mutations will be durable on the receiving RS if this method returns without any
1703    * exception.
1704    * @param controller the RPC controller
1705    * @param request the request
1706    * @throws ServiceException
1707    */
1708   @Override
1709   @QosPriority(priority = HConstants.REPLAY_QOS)
replay(final RpcController controller, final ReplicateWALEntryRequest request)1710   public ReplicateWALEntryResponse replay(final RpcController controller,
1711       final ReplicateWALEntryRequest request) throws ServiceException {
1712     long before = EnvironmentEdgeManager.currentTime();
1713     CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1714     try {
1715       checkOpen();
1716       List<WALEntry> entries = request.getEntryList();
1717       if (entries == null || entries.isEmpty()) {
1718         // empty input
1719         return ReplicateWALEntryResponse.newBuilder().build();
1720       }
1721       ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1722       Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1723       RegionCoprocessorHost coprocessorHost =
1724           ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1725             ? region.getCoprocessorHost()
1726             : null; // do not invoke coprocessors if this is a secondary region replica
1727       List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1728 
1729       // Skip adding the edits to WAL if this is a secondary region replica
1730       boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1731       Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1732 
1733       for (WALEntry entry : entries) {
1734         if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1735           throw new NotServingRegionException("Replay request contains entries from multiple " +
1736               "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1737               + entry.getKey().getEncodedRegionName());
1738         }
1739         if (regionServer.nonceManager != null && isPrimary) {
1740           long nonceGroup = entry.getKey().hasNonceGroup()
1741             ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1742           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1743           regionServer.nonceManager.reportOperationFromWal(
1744               nonceGroup,
1745               nonce,
1746               entry.getKey().getWriteTime());
1747         }
1748         Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1749           new Pair<WALKey, WALEdit>();
1750         List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1751           cells, walEntry, durability);
1752         if (coprocessorHost != null) {
1753           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
1754           // KeyValue.
1755           if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1756             walEntry.getSecond())) {
1757             // if bypass this log entry, ignore it ...
1758             continue;
1759           }
1760           walEntries.add(walEntry);
1761         }
1762         if(edits!=null && !edits.isEmpty()) {
1763           long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1764             entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1765           OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1766           // check if it's a partial success
1767           for (int i = 0; result != null && i < result.length; i++) {
1768             if (result[i] != OperationStatus.SUCCESS) {
1769               throw new IOException(result[i].getExceptionMsg());
1770             }
1771           }
1772         }
1773       }
1774 
1775       //sync wal at the end because ASYNC_WAL is used above
1776       WAL wal = getWAL(region);
1777       if (wal != null) {
1778         wal.sync();
1779       }
1780 
1781       if (coprocessorHost != null) {
1782         for (Pair<WALKey, WALEdit> entry : walEntries) {
1783           coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1784             entry.getSecond());
1785         }
1786       }
1787       return ReplicateWALEntryResponse.newBuilder().build();
1788     } catch (IOException ie) {
1789       throw new ServiceException(ie);
1790     } finally {
1791       if (regionServer.metricsRegionServer != null) {
1792         regionServer.metricsRegionServer.updateReplay(
1793           EnvironmentEdgeManager.currentTime() - before);
1794       }
1795     }
1796   }
1797 
getWAL(Region region)1798   WAL getWAL(Region region) {
1799     return ((HRegion)region).getWAL();
1800   }
1801 
1802   /**
1803    * Replicate WAL entries on the region server.
1804    *
1805    * @param controller the RPC controller
1806    * @param request the request
1807    * @throws ServiceException
1808    */
1809   @Override
1810   @QosPriority(priority=HConstants.REPLICATION_QOS)
replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request)1811   public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1812       final ReplicateWALEntryRequest request) throws ServiceException {
1813     try {
1814       checkOpen();
1815       if (regionServer.replicationSinkHandler != null) {
1816         requestCount.increment();
1817         List<WALEntry> entries = request.getEntryList();
1818         CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1819         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1820         regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
1821         regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1822         return ReplicateWALEntryResponse.newBuilder().build();
1823       } else {
1824         throw new ServiceException("Replication services are not initialized yet");
1825       }
1826     } catch (IOException ie) {
1827       throw new ServiceException(ie);
1828     }
1829   }
1830 
1831   /**
1832    * Roll the WAL writer of the region server.
1833    * @param controller the RPC controller
1834    * @param request the request
1835    * @throws ServiceException
1836    */
1837   @Override
rollWALWriter(final RpcController controller, final RollWALWriterRequest request)1838   public RollWALWriterResponse rollWALWriter(final RpcController controller,
1839       final RollWALWriterRequest request) throws ServiceException {
1840     try {
1841       checkOpen();
1842       requestCount.increment();
1843       regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1844       regionServer.walRoller.requestRollAll();
1845       regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1846       RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1847       return builder.build();
1848     } catch (IOException ie) {
1849       throw new ServiceException(ie);
1850     }
1851   }
1852 
1853   /**
1854    * Split a region on the region server.
1855    *
1856    * @param controller the RPC controller
1857    * @param request the request
1858    * @throws ServiceException
1859    */
1860   @Override
1861   @QosPriority(priority=HConstants.ADMIN_QOS)
splitRegion(final RpcController controller, final SplitRegionRequest request)1862   public SplitRegionResponse splitRegion(final RpcController controller,
1863       final SplitRegionRequest request) throws ServiceException {
1864     try {
1865       checkOpen();
1866       requestCount.increment();
1867       Region region = getRegion(request.getRegion());
1868       region.startRegionOperation(Operation.SPLIT_REGION);
1869       if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1870         throw new IOException("Can't split replicas directly. "
1871             + "Replicas are auto-split when their primary is split.");
1872       }
1873       LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1874       long startTime = EnvironmentEdgeManager.currentTime();
1875       FlushResult flushResult = region.flush(true);
1876       if (flushResult.isFlushSucceeded()) {
1877         long endTime = EnvironmentEdgeManager.currentTime();
1878         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1879       }
1880       byte[] splitPoint = null;
1881       if (request.hasSplitPoint()) {
1882         splitPoint = request.getSplitPoint().toByteArray();
1883       }
1884       ((HRegion)region).forceSplit(splitPoint);
1885       regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1886         RpcServer.getRequestUser());
1887       return SplitRegionResponse.newBuilder().build();
1888     } catch (DroppedSnapshotException ex) {
1889       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1890       throw new ServiceException(ex);
1891     } catch (IOException ie) {
1892       throw new ServiceException(ie);
1893     }
1894   }
1895 
1896   /**
1897    * Stop the region server.
1898    *
1899    * @param controller the RPC controller
1900    * @param request the request
1901    * @throws ServiceException
1902    */
1903   @Override
1904   @QosPriority(priority=HConstants.ADMIN_QOS)
stopServer(final RpcController controller, final StopServerRequest request)1905   public StopServerResponse stopServer(final RpcController controller,
1906       final StopServerRequest request) throws ServiceException {
1907     requestCount.increment();
1908     String reason = request.getReason();
1909     regionServer.stop(reason);
1910     return StopServerResponse.newBuilder().build();
1911   }
1912 
1913   @Override
updateFavoredNodes(RpcController controller, UpdateFavoredNodesRequest request)1914   public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1915       UpdateFavoredNodesRequest request) throws ServiceException {
1916     List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1917     UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1918     for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1919       HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1920       regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1921         regionUpdateInfo.getFavoredNodesList());
1922     }
1923     respBuilder.setResponse(openInfoList.size());
1924     return respBuilder.build();
1925   }
1926 
1927   /**
1928    * Atomically bulk load several HFiles into an open region
1929    * @return true if successful, false is failed but recoverably (no action)
1930    * @throws ServiceException if failed unrecoverably
1931    */
1932   @Override
bulkLoadHFile(final RpcController controller, final BulkLoadHFileRequest request)1933   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1934       final BulkLoadHFileRequest request) throws ServiceException {
1935     try {
1936       checkOpen();
1937       requestCount.increment();
1938       Region region = getRegion(request.getRegion());
1939       List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1940       for (FamilyPath familyPath: request.getFamilyPathList()) {
1941         familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1942           familyPath.getPath()));
1943       }
1944       boolean bypass = false;
1945       if (region.getCoprocessorHost() != null) {
1946         bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1947       }
1948       boolean loaded = false;
1949       if (!bypass) {
1950         loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
1951       }
1952       if (region.getCoprocessorHost() != null) {
1953         loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
1954       }
1955       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
1956       builder.setLoaded(loaded);
1957       return builder.build();
1958     } catch (IOException ie) {
1959       throw new ServiceException(ie);
1960     }
1961   }
1962 
1963   @Override
execService(final RpcController controller, final CoprocessorServiceRequest request)1964   public CoprocessorServiceResponse execService(final RpcController controller,
1965       final CoprocessorServiceRequest request) throws ServiceException {
1966     try {
1967       checkOpen();
1968       requestCount.increment();
1969       Region region = getRegion(request.getRegion());
1970       Message result = execServiceOnRegion(region, request.getCall());
1971       CoprocessorServiceResponse.Builder builder =
1972         CoprocessorServiceResponse.newBuilder();
1973       builder.setRegion(RequestConverter.buildRegionSpecifier(
1974         RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
1975       builder.setValue(
1976         builder.getValueBuilder().setName(result.getClass().getName())
1977           .setValue(result.toByteString()));
1978       return builder.build();
1979     } catch (IOException ie) {
1980       throw new ServiceException(ie);
1981     }
1982   }
1983 
execServiceOnRegion(Region region, final ClientProtos.CoprocessorServiceCall serviceCall)1984   private Message execServiceOnRegion(Region region,
1985       final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
1986     // ignore the passed in controller (from the serialized call)
1987     ServerRpcController execController = new ServerRpcController();
1988     return region.execService(execController, serviceCall);
1989   }
1990 
1991   /**
1992    * Get data from a table.
1993    *
1994    * @param controller the RPC controller
1995    * @param request the get request
1996    * @throws ServiceException
1997    */
1998   @Override
get(final RpcController controller, final GetRequest request)1999   public GetResponse get(final RpcController controller,
2000       final GetRequest request) throws ServiceException {
2001     long before = EnvironmentEdgeManager.currentTime();
2002     OperationQuota quota = null;
2003     try {
2004       checkOpen();
2005       requestCount.increment();
2006       Region region = getRegion(request.getRegion());
2007 
2008       GetResponse.Builder builder = GetResponse.newBuilder();
2009       ClientProtos.Get get = request.getGet();
2010       Boolean existence = null;
2011       Result r = null;
2012       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2013 
2014       if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2015         if (get.getColumnCount() != 1) {
2016           throw new DoNotRetryIOException(
2017             "get ClosestRowBefore supports one and only one family now, not "
2018               + get.getColumnCount() + " families");
2019         }
2020         byte[] row = get.getRow().toByteArray();
2021         byte[] family = get.getColumn(0).getFamily().toByteArray();
2022         r = region.getClosestRowBefore(row, family);
2023       } else {
2024         Get clientGet = ProtobufUtil.toGet(get);
2025         if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2026           existence = region.getCoprocessorHost().preExists(clientGet);
2027         }
2028         if (existence == null) {
2029           r = region.get(clientGet);
2030           if (get.getExistenceOnly()) {
2031             boolean exists = r.getExists();
2032             if (region.getCoprocessorHost() != null) {
2033               exists = region.getCoprocessorHost().postExists(clientGet, exists);
2034             }
2035             existence = exists;
2036           }
2037         }
2038       }
2039       if (existence != null){
2040         ClientProtos.Result pbr =
2041             ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2042         builder.setResult(pbr);
2043       } else  if (r != null) {
2044         ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2045         builder.setResult(pbr);
2046       }
2047       if (r != null) {
2048         quota.addGetResult(r);
2049       }
2050       return builder.build();
2051     } catch (IOException ie) {
2052       throw new ServiceException(ie);
2053     } finally {
2054       if (regionServer.metricsRegionServer != null) {
2055         regionServer.metricsRegionServer.updateGet(
2056           EnvironmentEdgeManager.currentTime() - before);
2057       }
2058       if (quota != null) {
2059         quota.close();
2060       }
2061     }
2062   }
2063 
2064   /**
2065    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2066    *
2067    * @param rpcc the RPC controller
2068    * @param request the multi request
2069    * @throws ServiceException
2070    */
2071   @Override
multi(final RpcController rpcc, final MultiRequest request)2072   public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2073   throws ServiceException {
2074     try {
2075       checkOpen();
2076     } catch (IOException ie) {
2077       throw new ServiceException(ie);
2078     }
2079 
2080     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2081     // It is also the conduit via which we pass back data.
2082     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2083     CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2084     if (controller != null) {
2085       controller.setCellScanner(null);
2086     }
2087 
2088     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2089 
2090     // this will contain all the cells that we need to return. It's created later, if needed.
2091     List<CellScannable> cellsToReturn = null;
2092     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2093     RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2094     Boolean processed = null;
2095 
2096     for (RegionAction regionAction : request.getRegionActionList()) {
2097       this.requestCount.add(regionAction.getActionCount());
2098       OperationQuota quota;
2099       Region region;
2100       regionActionResultBuilder.clear();
2101       try {
2102         region = getRegion(regionAction.getRegion());
2103         quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
2104       } catch (IOException e) {
2105         rpcServer.getMetrics().exception(e);
2106         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2107         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2108         // All Mutations in this RegionAction not executed as we can not see the Region online here
2109         // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2110         // corresponding to these Mutations.
2111         if (cellScanner != null) {
2112           skipCellsForMutations(regionAction.getActionList(), cellScanner);
2113         }
2114         continue;  // For this region it's a failure.
2115       }
2116 
2117       if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2118         // How does this call happen?  It may need some work to play well w/ the surroundings.
2119         // Need to return an item per Action along w/ Action index.  TODO.
2120         try {
2121           if (request.hasCondition()) {
2122             Condition condition = request.getCondition();
2123             byte[] row = condition.getRow().toByteArray();
2124             byte[] family = condition.getFamily().toByteArray();
2125             byte[] qualifier = condition.getQualifier().toByteArray();
2126             CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2127             ByteArrayComparable comparator =
2128                 ProtobufUtil.toComparator(condition.getComparator());
2129             processed = checkAndRowMutate(region, regionAction.getActionList(),
2130                   cellScanner, row, family, qualifier, compareOp, comparator);
2131           } else {
2132             ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2133                 cellScanner);
2134             // add the stats to the request
2135             if(stats != null) {
2136               responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2137                   .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2138             }
2139             processed = Boolean.TRUE;
2140           }
2141         } catch (IOException e) {
2142           rpcServer.getMetrics().exception(e);
2143           // As it's atomic, we may expect it's a global failure.
2144           regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2145         }
2146       } else {
2147         // doNonAtomicRegionMutation manages the exception internally
2148         cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2149             regionActionResultBuilder, cellsToReturn, nonceGroup);
2150       }
2151       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2152       quota.close();
2153     }
2154     // Load the controller with the Cells to return.
2155     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2156       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2157     }
2158     if (processed != null) {
2159       responseBuilder.setProcessed(processed);
2160     }
2161     return responseBuilder.build();
2162   }
2163 
skipCellsForMutations(List<Action> actions, CellScanner cellScanner)2164   private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2165     for (Action action : actions) {
2166       skipCellsForMutation(action, cellScanner);
2167     }
2168   }
2169 
skipCellsForMutation(Action action, CellScanner cellScanner)2170   private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2171     try {
2172       if (action.hasMutation()) {
2173         MutationProto m = action.getMutation();
2174         if (m.hasAssociatedCellCount()) {
2175           for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2176             cellScanner.advance();
2177           }
2178         }
2179       }
2180     } catch (IOException e) {
2181       // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2182       // marked as failed as we could not see the Region here. At client side the top level
2183       // RegionAction exception will be considered first.
2184       LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2185     }
2186   }
2187 
2188   /**
2189    * Mutate data in a table.
2190    *
2191    * @param rpcc the RPC controller
2192    * @param request the mutate request
2193    * @throws ServiceException
2194    */
2195   @Override
mutate(final RpcController rpcc, final MutateRequest request)2196   public MutateResponse mutate(final RpcController rpcc,
2197       final MutateRequest request) throws ServiceException {
2198     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2199     // It is also the conduit via which we pass back data.
2200     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2201     CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2202     OperationQuota quota = null;
2203     // Clear scanner so we are not holding on to reference across call.
2204     if (controller != null) {
2205       controller.setCellScanner(null);
2206     }
2207     try {
2208       checkOpen();
2209       requestCount.increment();
2210       Region region = getRegion(request.getRegion());
2211       MutateResponse.Builder builder = MutateResponse.newBuilder();
2212       MutationProto mutation = request.getMutation();
2213       if (!region.getRegionInfo().isMetaTable()) {
2214         regionServer.cacheFlusher.reclaimMemStoreMemory();
2215       }
2216       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2217       Result r = null;
2218       Boolean processed = null;
2219       MutationType type = mutation.getMutateType();
2220       long mutationSize = 0;
2221       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2222       switch (type) {
2223       case APPEND:
2224         // TODO: this doesn't actually check anything.
2225         r = append(region, quota, mutation, cellScanner, nonceGroup);
2226         break;
2227       case INCREMENT:
2228         // TODO: this doesn't actually check anything.
2229         r = increment(region, quota, mutation, cellScanner, nonceGroup);
2230         break;
2231       case PUT:
2232         Put put = ProtobufUtil.toPut(mutation, cellScanner);
2233         quota.addMutation(put);
2234         if (request.hasCondition()) {
2235           Condition condition = request.getCondition();
2236           byte[] row = condition.getRow().toByteArray();
2237           byte[] family = condition.getFamily().toByteArray();
2238           byte[] qualifier = condition.getQualifier().toByteArray();
2239           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2240           ByteArrayComparable comparator =
2241             ProtobufUtil.toComparator(condition.getComparator());
2242           if (region.getCoprocessorHost() != null) {
2243             processed = region.getCoprocessorHost().preCheckAndPut(
2244               row, family, qualifier, compareOp, comparator, put);
2245           }
2246           if (processed == null) {
2247             boolean result = region.checkAndMutate(row, family,
2248               qualifier, compareOp, comparator, put, true);
2249             if (region.getCoprocessorHost() != null) {
2250               result = region.getCoprocessorHost().postCheckAndPut(row, family,
2251                 qualifier, compareOp, comparator, put, result);
2252             }
2253             processed = result;
2254           }
2255         } else {
2256           region.put(put);
2257           processed = Boolean.TRUE;
2258         }
2259         break;
2260       case DELETE:
2261         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2262         quota.addMutation(delete);
2263         if (request.hasCondition()) {
2264           Condition condition = request.getCondition();
2265           byte[] row = condition.getRow().toByteArray();
2266           byte[] family = condition.getFamily().toByteArray();
2267           byte[] qualifier = condition.getQualifier().toByteArray();
2268           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2269           ByteArrayComparable comparator =
2270             ProtobufUtil.toComparator(condition.getComparator());
2271           if (region.getCoprocessorHost() != null) {
2272             processed = region.getCoprocessorHost().preCheckAndDelete(
2273               row, family, qualifier, compareOp, comparator, delete);
2274           }
2275           if (processed == null) {
2276             boolean result = region.checkAndMutate(row, family,
2277               qualifier, compareOp, comparator, delete, true);
2278             if (region.getCoprocessorHost() != null) {
2279               result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2280                 qualifier, compareOp, comparator, delete, result);
2281             }
2282             processed = result;
2283           }
2284         } else {
2285           region.delete(delete);
2286           processed = Boolean.TRUE;
2287         }
2288         break;
2289       default:
2290           throw new DoNotRetryIOException(
2291             "Unsupported mutate type: " + type.name());
2292       }
2293       if (processed != null) builder.setProcessed(processed.booleanValue());
2294       addResult(builder, r, controller);
2295       return builder.build();
2296     } catch (IOException ie) {
2297       regionServer.checkFileSystem();
2298       throw new ServiceException(ie);
2299     } finally {
2300       if (quota != null) {
2301         quota.close();
2302       }
2303     }
2304   }
2305 
2306   /**
2307    * Scan data in a table.
2308    *
2309    * @param controller the RPC controller
2310    * @param request the scan request
2311    * @throws ServiceException
2312    */
2313   @Override
scan(final RpcController controller, final ScanRequest request)2314   public ScanResponse scan(final RpcController controller, final ScanRequest request)
2315   throws ServiceException {
2316     OperationQuota quota = null;
2317     Leases.Lease lease = null;
2318     String scannerName = null;
2319     try {
2320       if (!request.hasScannerId() && !request.hasScan()) {
2321         throw new DoNotRetryIOException(
2322           "Missing required input: scannerId or scan");
2323       }
2324       long scannerId = -1;
2325       if (request.hasScannerId()) {
2326         scannerId = request.getScannerId();
2327         scannerName = String.valueOf(scannerId);
2328       }
2329       try {
2330         checkOpen();
2331       } catch (IOException e) {
2332         // If checkOpen failed, server not running or filesystem gone,
2333         // cancel this lease; filesystem is gone or we're closing or something.
2334         if (scannerName != null) {
2335           LOG.debug("Server shutting down and client tried to access missing scanner "
2336             + scannerName);
2337           if (regionServer.leases != null) {
2338             try {
2339               regionServer.leases.cancelLease(scannerName);
2340             } catch (LeaseException le) {
2341               // No problem, ignore
2342               if (LOG.isTraceEnabled()) {
2343                 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2344               }
2345              }
2346           }
2347         }
2348         throw e;
2349       }
2350       requestCount.increment();
2351 
2352       int ttl = 0;
2353       Region region = null;
2354       RegionScanner scanner = null;
2355       RegionScannerHolder rsh = null;
2356       boolean moreResults = true;
2357       boolean closeScanner = false;
2358       boolean isSmallScan = false;
2359       RpcCallContext context = RpcServer.getCurrentCall();
2360       Object lastBlock = null;
2361 
2362       ScanResponse.Builder builder = ScanResponse.newBuilder();
2363       if (request.hasCloseScanner()) {
2364         closeScanner = request.getCloseScanner();
2365       }
2366       int rows = closeScanner ? 0 : 1;
2367       if (request.hasNumberOfRows()) {
2368         rows = request.getNumberOfRows();
2369       }
2370       if (request.hasScannerId()) {
2371         rsh = scanners.get(scannerName);
2372         if (rsh == null) {
2373           LOG.info("Client tried to access missing scanner " + scannerName);
2374           throw new UnknownScannerException(
2375             "Name: " + scannerName + ", already closed?");
2376         }
2377         scanner = rsh.s;
2378         HRegionInfo hri = scanner.getRegionInfo();
2379         region = regionServer.getRegion(hri.getRegionName());
2380         if (region != rsh.r) { // Yes, should be the same instance
2381           throw new NotServingRegionException("Region was re-opened after the scanner"
2382             + scannerName + " was created: " + hri.getRegionNameAsString());
2383         }
2384       } else {
2385         region = getRegion(request.getRegion());
2386         ClientProtos.Scan protoScan = request.getScan();
2387         boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2388         Scan scan = ProtobufUtil.toScan(protoScan);
2389         // if the request doesn't set this, get the default region setting.
2390         if (!isLoadingCfsOnDemandSet) {
2391           scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2392         }
2393 
2394         isSmallScan = scan.isSmall();
2395         if (!scan.hasFamilies()) {
2396           // Adding all families to scanner
2397           for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2398             scan.addFamily(family);
2399           }
2400         }
2401 
2402         if (region.getCoprocessorHost() != null) {
2403           scanner = region.getCoprocessorHost().preScannerOpen(scan);
2404         }
2405         if (scanner == null) {
2406           scanner = region.getScanner(scan);
2407         }
2408         if (region.getCoprocessorHost() != null) {
2409           scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2410         }
2411         scannerId = addScanner(scanner, region);
2412         scannerName = String.valueOf(scannerId);
2413         ttl = this.scannerLeaseTimeoutPeriod;
2414       }
2415       if (request.hasRenew() && request.getRenew()) {
2416         rsh = scanners.get(scannerName);
2417         lease = regionServer.leases.removeLease(scannerName);
2418         if (lease != null && rsh != null) {
2419           regionServer.leases.addLease(lease);
2420           // Increment the nextCallSeq value which is the next expected from client.
2421           rsh.incNextCallSeq();
2422         }
2423         return builder.build();
2424       }
2425 
2426       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2427       long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2428       if (rows > 0) {
2429         // if nextCallSeq does not match throw Exception straight away. This needs to be
2430         // performed even before checking of Lease.
2431         // See HBASE-5974
2432         if (request.hasNextCallSeq()) {
2433           if (rsh == null) {
2434             rsh = scanners.get(scannerName);
2435           }
2436           if (rsh != null) {
2437             if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2438               throw new OutOfOrderScannerNextException(
2439                 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2440                 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2441                 "; request=" + TextFormat.shortDebugString(request));
2442             }
2443             // Increment the nextCallSeq value which is the next expected from client.
2444             rsh.incNextCallSeq();
2445           }
2446         }
2447         try {
2448           // Remove lease while its being processed in server; protects against case
2449           // where processing of request takes > lease expiration time.
2450           lease = regionServer.leases.removeLease(scannerName);
2451           List<Result> results = new ArrayList<Result>();
2452 
2453           boolean done = false;
2454           // Call coprocessor. Get region info from scanner.
2455           if (region != null && region.getCoprocessorHost() != null) {
2456             Boolean bypass = region.getCoprocessorHost().preScannerNext(
2457               scanner, results, rows);
2458             if (!results.isEmpty()) {
2459               for (Result r : results) {
2460                 lastBlock = addSize(context, r, lastBlock);
2461               }
2462             }
2463             if (bypass != null && bypass.booleanValue()) {
2464               done = true;
2465             }
2466           }
2467 
2468           if (!done) {
2469             long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2470             if (maxResultSize <= 0) {
2471               maxResultSize = maxQuotaResultSize;
2472             }
2473             // This is cells inside a row. Default size is 10 so if many versions or many cfs,
2474             // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
2475             // arbitrary 32. TODO: keep record of general size of results being returned.
2476             List<Cell> values = new ArrayList<Cell>(32);
2477             region.startRegionOperation(Operation.SCAN);
2478             try {
2479               int i = 0;
2480               synchronized(scanner) {
2481                 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2482                 boolean clientHandlesPartials =
2483                     request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2484                 boolean clientHandlesHeartbeats =
2485                     request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2486 
2487                 // On the server side we must ensure that the correct ordering of partial results is
2488                 // returned to the client to allow them to properly reconstruct the partial results.
2489                 // If the coprocessor host is adding to the result list, we cannot guarantee the
2490                 // correct ordering of partial results and so we prevent partial results from being
2491                 // formed.
2492                 boolean serverGuaranteesOrderOfPartials = results.isEmpty();
2493                 boolean allowPartialResults =
2494                     clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2495                 boolean moreRows = false;
2496 
2497                 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
2498                 // certain time threshold on the server. When the time threshold is exceeded, the
2499                 // server stops the scan and sends back whatever Results it has accumulated within
2500                 // that time period (may be empty). Since heartbeat messages have the potential to
2501                 // create partial Results (in the event that the timeout occurs in the middle of a
2502                 // row), we must only generate heartbeat messages when the client can handle both
2503                 // heartbeats AND partials
2504                 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2505 
2506                 // Default value of timeLimit is negative to indicate no timeLimit should be
2507                 // enforced.
2508                 long timeLimit = -1;
2509 
2510                 // Set the time limit to be half of the more restrictive timeout value (one of the
2511                 // timeout values must be positive). In the event that both values are positive, the
2512                 // more restrictive of the two is used to calculate the limit.
2513                 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2514                   long timeLimitDelta;
2515                   if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2516                     timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2517                   } else {
2518                     timeLimitDelta =
2519                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2520                   }
2521                   // Use half of whichever timeout value was more restrictive... But don't allow
2522                   // the time limit to be less than the allowable minimum (could cause an
2523                   // immediatate timeout before scanning any data).
2524                   timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2525                   timeLimit = System.currentTimeMillis() + timeLimitDelta;
2526                 }
2527 
2528                 final LimitScope sizeScope =
2529                     allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2530                 final LimitScope timeScope =
2531                     allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2532 
2533                 boolean trackMetrics =
2534                     request.hasTrackScanMetrics() && request.getTrackScanMetrics();
2535 
2536                 // Configure with limits for this RPC. Set keep progress true since size progress
2537                 // towards size limit should be kept between calls to nextRaw
2538                 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2539                 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2540                 contextBuilder.setBatchLimit(scanner.getBatch());
2541                 contextBuilder.setTimeLimit(timeScope, timeLimit);
2542                 contextBuilder.setTrackMetrics(trackMetrics);
2543                 ScannerContext scannerContext = contextBuilder.build();
2544 
2545                 boolean limitReached = false;
2546                 while (i < rows) {
2547                   // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
2548                   // batch limit is a limit on the number of cells per Result. Thus, if progress is
2549                   // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
2550                   // reset the batch progress between nextRaw invocations since we don't want the
2551                   // batch progress from previous calls to affect future calls
2552                   scannerContext.setBatchProgress(0);
2553 
2554                   // Collect values to be returned here
2555                   moreRows = scanner.nextRaw(values, scannerContext);
2556 
2557                   if (!values.isEmpty()) {
2558                     final boolean partial = scannerContext.partialResultFormed();
2559                     Result r = Result.create(values, null, stale, partial);
2560                     lastBlock = addSize(context, r, lastBlock);
2561                     results.add(r);
2562                     i++;
2563                   }
2564 
2565                   boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2566                   boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2567                   boolean rowLimitReached = i >= rows;
2568                   limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2569 
2570                   if (limitReached || !moreRows) {
2571                     if (LOG.isTraceEnabled()) {
2572                       LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2573                           + moreRows + " scannerContext: " + scannerContext);
2574                     }
2575                     // We only want to mark a ScanResponse as a heartbeat message in the event that
2576                     // there are more values to be read server side. If there aren't more values,
2577                     // marking it as a heartbeat is wasteful because the client will need to issue
2578                     // another ScanRequest only to realize that they already have all the values
2579                     if (moreRows) {
2580                       // Heartbeat messages occur when the time limit has been reached.
2581                       builder.setHeartbeatMessage(timeLimitReached);
2582                     }
2583                     break;
2584                   }
2585                   values.clear();
2586                 }
2587 
2588                 if (limitReached || moreRows) {
2589                   // We stopped prematurely
2590                   builder.setMoreResultsInRegion(true);
2591                 } else {
2592                   // We didn't get a single batch
2593                   builder.setMoreResultsInRegion(false);
2594                 }
2595 
2596                 // Check to see if the client requested that we track metrics server side. If the
2597                 // client requested metrics, retrieve the metrics from the scanner context.
2598                 if (trackMetrics) {
2599                   Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
2600                   ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
2601                   NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
2602 
2603                   for (Entry<String, Long> entry : metrics.entrySet()) {
2604                     pairBuilder.setName(entry.getKey());
2605                     pairBuilder.setValue(entry.getValue());
2606                     metricBuilder.addMetrics(pairBuilder.build());
2607                   }
2608 
2609                   builder.setScanMetrics(metricBuilder.build());
2610                 }
2611               }
2612               region.updateReadRequestsCount(i);
2613               long responseCellSize = context != null ? context.getResponseCellSize() : 0;
2614               region.getMetrics().updateScanNext(responseCellSize);
2615               if (regionServer.metricsRegionServer != null) {
2616                 regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
2617               }
2618             } finally {
2619               region.closeRegionOperation();
2620             }
2621 
2622             // coprocessor postNext hook
2623             if (region != null && region.getCoprocessorHost() != null) {
2624               region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2625             }
2626           }
2627 
2628           quota.addScanResult(results);
2629 
2630           // If the scanner's filter - if any - is done with the scan
2631           // and wants to tell the client to stop the scan. This is done by passing
2632           // a null result, and setting moreResults to false.
2633           if (scanner.isFilterDone() && results.isEmpty()) {
2634             moreResults = false;
2635             results = null;
2636           } else {
2637             addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2638           }
2639         } catch (IOException e) {
2640           // if we have an exception on scanner next and we are using the callSeq
2641           // we should rollback because the client will retry with the same callSeq
2642           // and get an OutOfOrderScannerNextException if we don't do so.
2643           if (rsh != null && request.hasNextCallSeq()) {
2644             rsh.rollbackNextCallSeq();
2645           }
2646           throw e;
2647         } finally {
2648           // We're done. On way out re-add the above removed lease.
2649           // Adding resets expiration time on lease.
2650           if (scanners.containsKey(scannerName)) {
2651             if (lease != null) regionServer.leases.addLease(lease);
2652             ttl = this.scannerLeaseTimeoutPeriod;
2653           }
2654         }
2655       }
2656 
2657       if (!moreResults || closeScanner) {
2658         ttl = 0;
2659         moreResults = false;
2660         if (region != null && region.getCoprocessorHost() != null) {
2661           if (region.getCoprocessorHost().preScannerClose(scanner)) {
2662             return builder.build(); // bypass
2663           }
2664         }
2665         rsh = scanners.remove(scannerName);
2666         if (rsh != null) {
2667           scanner = rsh.s;
2668           scanner.close();
2669           regionServer.leases.cancelLease(scannerName);
2670           if (region != null && region.getCoprocessorHost() != null) {
2671             region.getCoprocessorHost().postScannerClose(scanner);
2672           }
2673         }
2674       }
2675 
2676       if (ttl > 0) {
2677         builder.setTtl(ttl);
2678       }
2679       builder.setScannerId(scannerId);
2680       builder.setMoreResults(moreResults);
2681       return builder.build();
2682     } catch (IOException ie) {
2683       if (scannerName != null && ie instanceof NotServingRegionException) {
2684         RegionScannerHolder rsh = scanners.remove(scannerName);
2685         if (rsh != null) {
2686           try {
2687             RegionScanner scanner = rsh.s;
2688             LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2689             scanner.close();
2690             regionServer.leases.cancelLease(scannerName);
2691           } catch (IOException e) {
2692            LOG.warn("Getting exception closing " + scannerName, e);
2693           }
2694         }
2695       }
2696       throw new ServiceException(ie);
2697     } finally {
2698       if (quota != null) {
2699         quota.close();
2700       }
2701     }
2702   }
2703 
2704   @Override
execRegionServerService(RpcController controller, CoprocessorServiceRequest request)2705   public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2706       CoprocessorServiceRequest request) throws ServiceException {
2707     return regionServer.execRegionServerService(controller, request);
2708   }
2709 
2710   @Override
updateConfiguration( RpcController controller, UpdateConfigurationRequest request)2711   public UpdateConfigurationResponse updateConfiguration(
2712       RpcController controller, UpdateConfigurationRequest request)
2713       throws ServiceException {
2714     try {
2715       this.regionServer.updateConfiguration();
2716     } catch (Exception e) {
2717       throw new ServiceException(e);
2718     }
2719     return UpdateConfigurationResponse.getDefaultInstance();
2720   }
2721 }
2722