1 /* 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 20 package org.apache.hadoop.hbase.client.coprocessor; 21 22 import java.io.Closeable; 23 import java.io.IOException; 24 import java.nio.ByteBuffer; 25 import java.util.ArrayList; 26 import java.util.List; 27 import java.util.Map; 28 import java.util.NavigableMap; 29 import java.util.NavigableSet; 30 import java.util.TreeMap; 31 import java.util.concurrent.atomic.AtomicLong; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.hbase.classification.InterfaceAudience; 36 import org.apache.hadoop.conf.Configuration; 37 import org.apache.hadoop.hbase.Cell; 38 import org.apache.hadoop.hbase.HConstants; 39 import org.apache.hadoop.hbase.TableName; 40 import org.apache.hadoop.hbase.classification.InterfaceAudience; 41 import org.apache.hadoop.hbase.client.Connection; 42 import org.apache.hadoop.hbase.client.ConnectionFactory; 43 import org.apache.hadoop.hbase.client.Result; 44 import org.apache.hadoop.hbase.client.ResultScanner; 45 import org.apache.hadoop.hbase.client.Scan; 46 import org.apache.hadoop.hbase.client.Table; 47 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; 48 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; 49 import org.apache.hadoop.hbase.ipc.ServerRpcController; 50 import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 51 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; 52 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; 53 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; 54 import org.apache.hadoop.hbase.util.Bytes; 55 import org.apache.hadoop.hbase.util.Pair; 56 57 import com.google.protobuf.ByteString; 58 import com.google.protobuf.Message; 59 60 /** 61 * This client class is for invoking the aggregate functions deployed on the 62 * Region Server side via the AggregateService. This class will implement the 63 * supporting functionality for summing/processing the individual results 64 * obtained from the AggregateService for each region. 65 * <p> 66 * This will serve as the client side handler for invoking the aggregate 67 * functions. 68 * For all aggregate functions, 69 * <ul> 70 * <li>start row < end row is an essential condition (if they are not 71 * {@link HConstants#EMPTY_BYTE_ARRAY}) 72 * <li>Column family can't be null. In case where multiple families are 73 * provided, an IOException will be thrown. An optional column qualifier can 74 * also be defined.</li> 75 * <li>For methods to find maximum, minimum, sum, rowcount, it returns the 76 * parameter type. For average and std, it returns a double value. For row 77 * count, it returns a long value.</li> 78 * </ul> 79 * <p>Call {@link #close()} when done. 80 */ 81 @InterfaceAudience.Private 82 public class AggregationClient implements Closeable { 83 // TODO: This class is not used. Move to examples? 84 private static final Log log = LogFactory.getLog(AggregationClient.class); 85 private final Connection connection; 86 87 /** 88 * Constructor with Conf object 89 * @param cfg 90 */ AggregationClient(Configuration cfg)91 public AggregationClient(Configuration cfg) { 92 try { 93 // Create a connection on construction. Will use it making each of the calls below. 94 this.connection = ConnectionFactory.createConnection(cfg); 95 } catch (IOException e) { 96 throw new RuntimeException(e); 97 } 98 } 99 100 @Override close()101 public void close() throws IOException { 102 if (this.connection != null && !this.connection.isClosed()) { 103 this.connection.close(); 104 } 105 } 106 107 /** 108 * It gives the maximum value of a column for a given column family for the 109 * given range. In case qualifier is null, a max of all values for the given 110 * family is returned. 111 * @param tableName 112 * @param ci 113 * @param scan 114 * @return max val <R> 115 * @throws Throwable 116 * The caller is supposed to handle the exception as they are thrown 117 * & propagated to it. 118 */ max( final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)119 public <R, S, P extends Message, Q extends Message, T extends Message> R max( 120 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 121 throws Throwable { 122 try (Table table = connection.getTable(tableName)) { 123 return max(table, ci, scan); 124 } 125 } 126 127 /** 128 * It gives the maximum value of a column for a given column family for the 129 * given range. In case qualifier is null, a max of all values for the given 130 * family is returned. 131 * @param table 132 * @param ci 133 * @param scan 134 * @return max val <> 135 * @throws Throwable 136 * The caller is supposed to handle the exception as they are thrown 137 * & propagated to it. 138 */ 139 public <R, S, P extends Message, Q extends Message, T extends Message> max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)140 R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, 141 final Scan scan) throws Throwable { 142 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 143 class MaxCallBack implements Batch.Callback<R> { 144 R max = null; 145 146 R getMax() { 147 return max; 148 } 149 150 @Override 151 public synchronized void update(byte[] region, byte[] row, R result) { 152 max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max; 153 } 154 } 155 MaxCallBack aMaxCallBack = new MaxCallBack(); 156 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 157 new Batch.Call<AggregateService, R>() { 158 @Override 159 public R call(AggregateService instance) throws IOException { 160 ServerRpcController controller = new ServerRpcController(); 161 BlockingRpcCallback<AggregateResponse> rpcCallback = 162 new BlockingRpcCallback<AggregateResponse>(); 163 instance.getMax(controller, requestArg, rpcCallback); 164 AggregateResponse response = rpcCallback.get(); 165 if (controller.failedOnException()) { 166 throw controller.getFailedOn(); 167 } 168 if (response.getFirstPartCount() > 0) { 169 ByteString b = response.getFirstPart(0); 170 Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); 171 return ci.getCellValueFromProto(q); 172 } 173 return null; 174 } 175 }, aMaxCallBack); 176 return aMaxCallBack.getMax(); 177 } 178 179 /* 180 * @param scan 181 * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan 182 */ validateParameters(Scan scan, boolean canFamilyBeAbsent)183 private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException { 184 if (scan == null 185 || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes 186 .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) 187 || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && 188 !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) { 189 throw new IOException( 190 "Agg client Exception: Startrow should be smaller than Stoprow"); 191 } else if (!canFamilyBeAbsent) { 192 if (scan.getFamilyMap().size() != 1) { 193 throw new IOException("There must be only one family."); 194 } 195 } 196 } 197 198 /** 199 * It gives the minimum value of a column for a given column family for the 200 * given range. In case qualifier is null, a min of all values for the given 201 * family is returned. 202 * @param tableName 203 * @param ci 204 * @param scan 205 * @return min val <R> 206 * @throws Throwable 207 */ min( final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)208 public <R, S, P extends Message, Q extends Message, T extends Message> R min( 209 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 210 throws Throwable { 211 try (Table table = connection.getTable(tableName)) { 212 return min(table, ci, scan); 213 } 214 } 215 216 /** 217 * It gives the minimum value of a column for a given column family for the 218 * given range. In case qualifier is null, a min of all values for the given 219 * family is returned. 220 * @param table 221 * @param ci 222 * @param scan 223 * @return min val <R> 224 * @throws Throwable 225 */ 226 public <R, S, P extends Message, Q extends Message, T extends Message> min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)227 R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, 228 final Scan scan) throws Throwable { 229 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 230 class MinCallBack implements Batch.Callback<R> { 231 232 private R min = null; 233 234 public R getMinimum() { 235 return min; 236 } 237 238 @Override 239 public synchronized void update(byte[] region, byte[] row, R result) { 240 min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min; 241 } 242 } 243 MinCallBack minCallBack = new MinCallBack(); 244 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 245 new Batch.Call<AggregateService, R>() { 246 247 @Override 248 public R call(AggregateService instance) throws IOException { 249 ServerRpcController controller = new ServerRpcController(); 250 BlockingRpcCallback<AggregateResponse> rpcCallback = 251 new BlockingRpcCallback<AggregateResponse>(); 252 instance.getMin(controller, requestArg, rpcCallback); 253 AggregateResponse response = rpcCallback.get(); 254 if (controller.failedOnException()) { 255 throw controller.getFailedOn(); 256 } 257 if (response.getFirstPartCount() > 0) { 258 ByteString b = response.getFirstPart(0); 259 Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); 260 return ci.getCellValueFromProto(q); 261 } 262 return null; 263 } 264 }, minCallBack); 265 log.debug("Min fom all regions is: " + minCallBack.getMinimum()); 266 return minCallBack.getMinimum(); 267 } 268 269 /** 270 * It gives the row count, by summing up the individual results obtained from 271 * regions. In case the qualifier is null, FirstKeyValueFilter is used to 272 * optimised the operation. In case qualifier is provided, I can't use the 273 * filter as it may set the flag to skip to next row, but the value read is 274 * not of the given filter: in this case, this particular row will not be 275 * counted ==> an error. 276 * @param tableName 277 * @param ci 278 * @param scan 279 * @return <R, S> 280 * @throws Throwable 281 */ rowCount( final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)282 public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount( 283 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 284 throws Throwable { 285 try (Table table = connection.getTable(tableName)) { 286 return rowCount(table, ci, scan); 287 } 288 } 289 290 /** 291 * It gives the row count, by summing up the individual results obtained from 292 * regions. In case the qualifier is null, FirstKeyValueFilter is used to 293 * optimised the operation. In case qualifier is provided, I can't use the 294 * filter as it may set the flag to skip to next row, but the value read is 295 * not of the given filter: in this case, this particular row will not be 296 * counted ==> an error. 297 * @param table 298 * @param ci 299 * @param scan 300 * @return <R, S> 301 * @throws Throwable 302 */ 303 public <R, S, P extends Message, Q extends Message, T extends Message> rowCount(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)304 long rowCount(final Table table, 305 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { 306 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true); 307 class RowNumCallback implements Batch.Callback<Long> { 308 private final AtomicLong rowCountL = new AtomicLong(0); 309 310 public long getRowNumCount() { 311 return rowCountL.get(); 312 } 313 314 @Override 315 public void update(byte[] region, byte[] row, Long result) { 316 rowCountL.addAndGet(result.longValue()); 317 } 318 } 319 RowNumCallback rowNum = new RowNumCallback(); 320 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 321 new Batch.Call<AggregateService, Long>() { 322 @Override 323 public Long call(AggregateService instance) throws IOException { 324 ServerRpcController controller = new ServerRpcController(); 325 BlockingRpcCallback<AggregateResponse> rpcCallback = 326 new BlockingRpcCallback<AggregateResponse>(); 327 instance.getRowNum(controller, requestArg, rpcCallback); 328 AggregateResponse response = rpcCallback.get(); 329 if (controller.failedOnException()) { 330 throw controller.getFailedOn(); 331 } 332 byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); 333 ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); 334 bb.rewind(); 335 return bb.getLong(); 336 } 337 }, rowNum); 338 return rowNum.getRowNumCount(); 339 } 340 341 /** 342 * It sums up the value returned from various regions. In case qualifier is 343 * null, summation of all the column qualifiers in the given family is done. 344 * @param tableName 345 * @param ci 346 * @param scan 347 * @return sum <S> 348 * @throws Throwable 349 */ sum( final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)350 public <R, S, P extends Message, Q extends Message, T extends Message> S sum( 351 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 352 throws Throwable { 353 try (Table table = connection.getTable(tableName)) { 354 return sum(table, ci, scan); 355 } 356 } 357 358 /** 359 * It sums up the value returned from various regions. In case qualifier is 360 * null, summation of all the column qualifiers in the given family is done. 361 * @param table 362 * @param ci 363 * @param scan 364 * @return sum <S> 365 * @throws Throwable 366 */ 367 public <R, S, P extends Message, Q extends Message, T extends Message> sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)368 S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, 369 final Scan scan) throws Throwable { 370 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 371 372 class SumCallBack implements Batch.Callback<S> { 373 S sumVal = null; 374 375 public S getSumResult() { 376 return sumVal; 377 } 378 379 @Override 380 public synchronized void update(byte[] region, byte[] row, S result) { 381 sumVal = ci.add(sumVal, result); 382 } 383 } 384 SumCallBack sumCallBack = new SumCallBack(); 385 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 386 new Batch.Call<AggregateService, S>() { 387 @Override 388 public S call(AggregateService instance) throws IOException { 389 ServerRpcController controller = new ServerRpcController(); 390 BlockingRpcCallback<AggregateResponse> rpcCallback = 391 new BlockingRpcCallback<AggregateResponse>(); 392 instance.getSum(controller, requestArg, rpcCallback); 393 AggregateResponse response = rpcCallback.get(); 394 if (controller.failedOnException()) { 395 throw controller.getFailedOn(); 396 } 397 if (response.getFirstPartCount() == 0) { 398 return null; 399 } 400 ByteString b = response.getFirstPart(0); 401 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); 402 S s = ci.getPromotedValueFromProto(t); 403 return s; 404 } 405 }, sumCallBack); 406 return sumCallBack.getSumResult(); 407 } 408 409 /** 410 * It computes average while fetching sum and row count from all the 411 * corresponding regions. Approach is to compute a global sum of region level 412 * sum and rowcount and then compute the average. 413 * @param tableName 414 * @param scan 415 * @throws Throwable 416 */ getAvgArgs( final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)417 private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs( 418 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 419 throws Throwable { 420 try (Table table = connection.getTable(tableName)) { 421 return getAvgArgs(table, ci, scan); 422 } 423 } 424 425 /** 426 * It computes average while fetching sum and row count from all the 427 * corresponding regions. Approach is to compute a global sum of region level 428 * sum and rowcount and then compute the average. 429 * @param table 430 * @param scan 431 * @throws Throwable 432 */ 433 private <R, S, P extends Message, Q extends Message, T extends Message> getAvgArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)434 Pair<S, Long> getAvgArgs(final Table table, 435 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { 436 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 437 class AvgCallBack implements Batch.Callback<Pair<S, Long>> { 438 S sum = null; 439 Long rowCount = 0l; 440 441 public synchronized Pair<S, Long> getAvgArgs() { 442 return new Pair<S, Long>(sum, rowCount); 443 } 444 445 @Override 446 public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) { 447 sum = ci.add(sum, result.getFirst()); 448 rowCount += result.getSecond(); 449 } 450 } 451 AvgCallBack avgCallBack = new AvgCallBack(); 452 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 453 new Batch.Call<AggregateService, Pair<S, Long>>() { 454 @Override 455 public Pair<S, Long> call(AggregateService instance) throws IOException { 456 ServerRpcController controller = new ServerRpcController(); 457 BlockingRpcCallback<AggregateResponse> rpcCallback = 458 new BlockingRpcCallback<AggregateResponse>(); 459 instance.getAvg(controller, requestArg, rpcCallback); 460 AggregateResponse response = rpcCallback.get(); 461 if (controller.failedOnException()) { 462 throw controller.getFailedOn(); 463 } 464 Pair<S, Long> pair = new Pair<S, Long>(null, 0L); 465 if (response.getFirstPartCount() == 0) { 466 return pair; 467 } 468 ByteString b = response.getFirstPart(0); 469 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); 470 S s = ci.getPromotedValueFromProto(t); 471 pair.setFirst(s); 472 ByteBuffer bb = ByteBuffer.allocate(8).put( 473 getBytesFromResponse(response.getSecondPart())); 474 bb.rewind(); 475 pair.setSecond(bb.getLong()); 476 return pair; 477 } 478 }, avgCallBack); 479 return avgCallBack.getAvgArgs(); 480 } 481 482 /** 483 * This is the client side interface/handle for calling the average method for 484 * a given cf-cq combination. It was necessary to add one more call stack as 485 * its return type should be a decimal value, irrespective of what 486 * columninterpreter says. So, this methods collects the necessary parameters 487 * to compute the average and returs the double value. 488 * @param tableName 489 * @param ci 490 * @param scan 491 * @return <R, S> 492 * @throws Throwable 493 */ 494 public <R, S, P extends Message, Q extends Message, T extends Message> avg(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan)495 double avg(final TableName tableName, 496 final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { 497 Pair<S, Long> p = getAvgArgs(tableName, ci, scan); 498 return ci.divideForAvg(p.getFirst(), p.getSecond()); 499 } 500 501 /** 502 * This is the client side interface/handle for calling the average method for 503 * a given cf-cq combination. It was necessary to add one more call stack as 504 * its return type should be a decimal value, irrespective of what 505 * columninterpreter says. So, this methods collects the necessary parameters 506 * to compute the average and returs the double value. 507 * @param table 508 * @param ci 509 * @param scan 510 * @return <R, S> 511 * @throws Throwable 512 */ avg( final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan)513 public <R, S, P extends Message, Q extends Message, T extends Message> double avg( 514 final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { 515 Pair<S, Long> p = getAvgArgs(table, ci, scan); 516 return ci.divideForAvg(p.getFirst(), p.getSecond()); 517 } 518 519 /** 520 * It computes a global standard deviation for a given column and its value. 521 * Standard deviation is square root of (average of squares - 522 * average*average). From individual regions, it obtains sum, square sum and 523 * number of rows. With these, the above values are computed to get the global 524 * std. 525 * @param table 526 * @param scan 527 * @return standard deviations 528 * @throws Throwable 529 */ 530 private <R, S, P extends Message, Q extends Message, T extends Message> getStdArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)531 Pair<List<S>, Long> getStdArgs(final Table table, 532 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { 533 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 534 class StdCallback implements Batch.Callback<Pair<List<S>, Long>> { 535 long rowCountVal = 0l; 536 S sumVal = null, sumSqVal = null; 537 538 public synchronized Pair<List<S>, Long> getStdParams() { 539 List<S> l = new ArrayList<S>(); 540 l.add(sumVal); 541 l.add(sumSqVal); 542 Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal); 543 return p; 544 } 545 546 @Override 547 public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) { 548 if (result.getFirst().size() > 0) { 549 sumVal = ci.add(sumVal, result.getFirst().get(0)); 550 sumSqVal = ci.add(sumSqVal, result.getFirst().get(1)); 551 rowCountVal += result.getSecond(); 552 } 553 } 554 } 555 StdCallback stdCallback = new StdCallback(); 556 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 557 new Batch.Call<AggregateService, Pair<List<S>, Long>>() { 558 @Override 559 public Pair<List<S>, Long> call(AggregateService instance) throws IOException { 560 ServerRpcController controller = new ServerRpcController(); 561 BlockingRpcCallback<AggregateResponse> rpcCallback = 562 new BlockingRpcCallback<AggregateResponse>(); 563 instance.getStd(controller, requestArg, rpcCallback); 564 AggregateResponse response = rpcCallback.get(); 565 if (controller.failedOnException()) { 566 throw controller.getFailedOn(); 567 } 568 Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L); 569 if (response.getFirstPartCount() == 0) { 570 return pair; 571 } 572 List<S> list = new ArrayList<S>(); 573 for (int i = 0; i < response.getFirstPartCount(); i++) { 574 ByteString b = response.getFirstPart(i); 575 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); 576 S s = ci.getPromotedValueFromProto(t); 577 list.add(s); 578 } 579 pair.setFirst(list); 580 ByteBuffer bb = ByteBuffer.allocate(8).put( 581 getBytesFromResponse(response.getSecondPart())); 582 bb.rewind(); 583 pair.setSecond(bb.getLong()); 584 return pair; 585 } 586 }, stdCallback); 587 return stdCallback.getStdParams(); 588 } 589 590 /** 591 * This is the client side interface/handle for calling the std method for a 592 * given cf-cq combination. It was necessary to add one more call stack as its 593 * return type should be a decimal value, irrespective of what 594 * columninterpreter says. So, this methods collects the necessary parameters 595 * to compute the std and returns the double value. 596 * @param tableName 597 * @param ci 598 * @param scan 599 * @return <R, S> 600 * @throws Throwable 601 */ 602 public <R, S, P extends Message, Q extends Message, T extends Message> std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan)603 double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, 604 Scan scan) throws Throwable { 605 try (Table table = connection.getTable(tableName)) { 606 return std(table, ci, scan); 607 } 608 } 609 610 /** 611 * This is the client side interface/handle for calling the std method for a 612 * given cf-cq combination. It was necessary to add one more call stack as its 613 * return type should be a decimal value, irrespective of what 614 * columninterpreter says. So, this methods collects the necessary parameters 615 * to compute the std and returns the double value. 616 * @param table 617 * @param ci 618 * @param scan 619 * @return <R, S> 620 * @throws Throwable 621 */ std( final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan)622 public <R, S, P extends Message, Q extends Message, T extends Message> double std( 623 final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { 624 Pair<List<S>, Long> p = getStdArgs(table, ci, scan); 625 double res = 0d; 626 double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond()); 627 double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond()); 628 res = avgOfSumSq - (avg) * (avg); // variance 629 res = Math.pow(res, 0.5); 630 return res; 631 } 632 633 /** 634 * It helps locate the region with median for a given column whose weight 635 * is specified in an optional column. 636 * From individual regions, it obtains sum of values and sum of weights. 637 * @param table 638 * @param ci 639 * @param scan 640 * @return pair whose first element is a map between start row of the region 641 * and (sum of values, sum of weights) for the region, the second element is 642 * (sum of values, sum of weights) for all the regions chosen 643 * @throws Throwable 644 */ 645 private <R, S, P extends Message, Q extends Message, T extends Message> 646 Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)647 getMedianArgs(final Table table, 648 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { 649 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 650 final NavigableMap<byte[], List<S>> map = 651 new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR); 652 class StdCallback implements Batch.Callback<List<S>> { 653 S sumVal = null, sumWeights = null; 654 655 public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() { 656 List<S> l = new ArrayList<S>(); 657 l.add(sumVal); 658 l.add(sumWeights); 659 Pair<NavigableMap<byte[], List<S>>, List<S>> p = 660 new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l); 661 return p; 662 } 663 664 @Override 665 public synchronized void update(byte[] region, byte[] row, List<S> result) { 666 map.put(row, result); 667 sumVal = ci.add(sumVal, result.get(0)); 668 sumWeights = ci.add(sumWeights, result.get(1)); 669 } 670 } 671 StdCallback stdCallback = new StdCallback(); 672 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 673 new Batch.Call<AggregateService, List<S>>() { 674 @Override 675 public List<S> call(AggregateService instance) throws IOException { 676 ServerRpcController controller = new ServerRpcController(); 677 BlockingRpcCallback<AggregateResponse> rpcCallback = 678 new BlockingRpcCallback<AggregateResponse>(); 679 instance.getMedian(controller, requestArg, rpcCallback); 680 AggregateResponse response = rpcCallback.get(); 681 if (controller.failedOnException()) { 682 throw controller.getFailedOn(); 683 } 684 685 List<S> list = new ArrayList<S>(); 686 for (int i = 0; i < response.getFirstPartCount(); i++) { 687 ByteString b = response.getFirstPart(i); 688 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); 689 S s = ci.getPromotedValueFromProto(t); 690 list.add(s); 691 } 692 return list; 693 } 694 695 }, stdCallback); 696 return stdCallback.getMedianParams(); 697 } 698 699 /** 700 * This is the client side interface/handler for calling the median method for a 701 * given cf-cq combination. This method collects the necessary parameters 702 * to compute the median and returns the median. 703 * @param tableName 704 * @param ci 705 * @param scan 706 * @return R the median 707 * @throws Throwable 708 */ 709 public <R, S, P extends Message, Q extends Message, T extends Message> median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan)710 R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, 711 Scan scan) throws Throwable { 712 try (Table table = connection.getTable(tableName)) { 713 return median(table, ci, scan); 714 } 715 } 716 717 /** 718 * This is the client side interface/handler for calling the median method for a 719 * given cf-cq combination. This method collects the necessary parameters 720 * to compute the median and returns the median. 721 * @param table 722 * @param ci 723 * @param scan 724 * @return R the median 725 * @throws Throwable 726 */ 727 public <R, S, P extends Message, Q extends Message, T extends Message> median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan)728 R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, 729 Scan scan) throws Throwable { 730 Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan); 731 byte[] startRow = null; 732 byte[] colFamily = scan.getFamilies()[0]; 733 NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily); 734 NavigableMap<byte[], List<S>> map = p.getFirst(); 735 S sumVal = p.getSecond().get(0); 736 S sumWeights = p.getSecond().get(1); 737 double halfSumVal = ci.divideForAvg(sumVal, 2L); 738 double movingSumVal = 0; 739 boolean weighted = false; 740 if (quals.size() > 1) { 741 weighted = true; 742 halfSumVal = ci.divideForAvg(sumWeights, 2L); 743 } 744 745 for (Map.Entry<byte[], List<S>> entry : map.entrySet()) { 746 S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0); 747 double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); 748 if (newSumVal > halfSumVal) break; // we found the region with the median 749 movingSumVal = newSumVal; 750 startRow = entry.getKey(); 751 } 752 // scan the region with median and find it 753 Scan scan2 = new Scan(scan); 754 // inherit stop row from method parameter 755 if (startRow != null) scan2.setStartRow(startRow); 756 ResultScanner scanner = null; 757 try { 758 int cacheSize = scan2.getCaching(); 759 if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) { 760 scan2.setCacheBlocks(true); 761 cacheSize = 5; 762 scan2.setCaching(cacheSize); 763 } 764 scanner = table.getScanner(scan2); 765 Result[] results = null; 766 byte[] qualifier = quals.pollFirst(); 767 // qualifier for the weight column 768 byte[] weightQualifier = weighted ? quals.pollLast() : qualifier; 769 R value = null; 770 do { 771 results = scanner.next(cacheSize); 772 if (results != null && results.length > 0) { 773 for (int i = 0; i < results.length; i++) { 774 Result r = results[i]; 775 // retrieve weight 776 Cell kv = r.getColumnLatest(colFamily, weightQualifier); 777 R newValue = ci.getValue(colFamily, weightQualifier, kv); 778 S s = ci.castToReturnType(newValue); 779 double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); 780 // see if we have moved past the median 781 if (newSumVal > halfSumVal) { 782 return value; 783 } 784 movingSumVal = newSumVal; 785 kv = r.getColumnLatest(colFamily, qualifier); 786 value = ci.getValue(colFamily, qualifier, kv); 787 } 788 } 789 } while (results != null && results.length > 0); 790 } finally { 791 if (scanner != null) { 792 scanner.close(); 793 } 794 } 795 return null; 796 } 797 798 <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)799 validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent) 800 throws IOException { 801 validateParameters(scan, canFamilyBeAbsent); 802 final AggregateRequest.Builder requestBuilder = 803 AggregateRequest.newBuilder(); 804 requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); 805 P columnInterpreterSpecificData = null; 806 if ((columnInterpreterSpecificData = ci.getRequestData()) 807 != null) { 808 requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); 809 } 810 requestBuilder.setScan(ProtobufUtil.toScan(scan)); 811 return requestBuilder.build(); 812 } 813 getBytesFromResponse(ByteString response)814 byte[] getBytesFromResponse(ByteString response) { 815 ByteBuffer bb = response.asReadOnlyByteBuffer(); 816 bb.rewind(); 817 byte[] bytes; 818 if (bb.hasArray()) { 819 bytes = bb.array(); 820 } else { 821 bytes = response.toByteArray(); 822 } 823 return bytes; 824 } 825 } 826