1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.replication.regionserver; 19 20 import org.apache.hadoop.hbase.classification.InterfaceAudience; 21 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 22 23 /** 24 * Per-peer per-node throttling controller for replication: enabled if 25 * bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed 26 * to peer within each cycle won't exceed 'bandwidth' bytes 27 */ 28 @InterfaceAudience.Private 29 public class ReplicationThrottler { 30 private final boolean enabled; 31 private final double bandwidth; 32 private long cyclePushSize; 33 private long cycleStartTick; 34 35 /** 36 * ReplicationThrottler constructor 37 * If bandwidth less than 1, throttling is disabled 38 * @param bandwidth per cycle(100ms) 39 */ ReplicationThrottler(final double bandwidth)40 public ReplicationThrottler(final double bandwidth) { 41 this.bandwidth = bandwidth; 42 this.enabled = this.bandwidth > 0; 43 if (this.enabled) { 44 this.cyclePushSize = 0; 45 this.cycleStartTick = EnvironmentEdgeManager.currentTime(); 46 } 47 } 48 49 /** 50 * If throttling is enabled 51 * @return true if throttling is enabled 52 */ isEnabled()53 public boolean isEnabled() { 54 return this.enabled; 55 } 56 57 /** 58 * Get how long the caller should sleep according to the current size and 59 * current cycle's total push size and start tick, return the sleep interval 60 * for throttling control. 61 * @param size is the size of edits to be pushed 62 * @return sleep interval for throttling control 63 */ getNextSleepInterval(final int size)64 public long getNextSleepInterval(final int size) { 65 if (!this.enabled) { 66 return 0; 67 } 68 69 long sleepTicks = 0; 70 long now = EnvironmentEdgeManager.currentTime(); 71 // 1. if cyclePushSize exceeds bandwidth, we need to sleep some 72 // following cycles to amortize, this case can occur when a single push 73 // exceeds the bandwidth 74 if ((double)this.cyclePushSize > bandwidth) { 75 double cycles = Math.ceil((double)this.cyclePushSize / bandwidth); 76 long shouldTillTo = this.cycleStartTick + (long)(cycles * 100); 77 if (shouldTillTo > now) { 78 sleepTicks = shouldTillTo - now; 79 } else { 80 // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here! 81 this.cycleStartTick = now; 82 } 83 this.cyclePushSize = 0; 84 } else { 85 long nextCycleTick = this.cycleStartTick + 100; //a cycle is 100ms 86 if (now >= nextCycleTick) { 87 // 2. switch to next cycle if the current cycle has passed 88 this.cycleStartTick = now; 89 this.cyclePushSize = 0; 90 } else if (this.cyclePushSize > 0 && 91 (double)(this.cyclePushSize + size) >= bandwidth) { 92 // 3. delay the push to next cycle if exceeds throttling bandwidth. 93 // enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case 94 // where a cycle's first push size(currentSize) > bandwidth 95 sleepTicks = nextCycleTick - now; 96 this.cyclePushSize = 0; 97 } 98 } 99 return sleepTicks; 100 } 101 102 /** 103 * Add current size to the current cycle's total push size 104 * @param size is the current size added to the current cycle's 105 * total push size 106 */ addPushSize(final int size)107 public void addPushSize(final int size) { 108 if (this.enabled) { 109 this.cyclePushSize += size; 110 } 111 } 112 113 /** 114 * Reset the cycle start tick to NOW 115 */ resetStartTick()116 public void resetStartTick() { 117 if (this.enabled) { 118 this.cycleStartTick = EnvironmentEdgeManager.currentTime(); 119 } 120 } 121 } 122