1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hbase.replication.regionserver;
19 
20 import org.apache.hadoop.hbase.classification.InterfaceAudience;
21 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
22 
23 /**
24  * Per-peer per-node throttling controller for replication: enabled if
25  * bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed
26  * to peer within each cycle won't exceed 'bandwidth' bytes
27  */
28 @InterfaceAudience.Private
29 public class ReplicationThrottler {
30   private final boolean enabled;
31   private final double bandwidth;
32   private long cyclePushSize;
33   private long cycleStartTick;
34 
35   /**
36    * ReplicationThrottler constructor
37    * If bandwidth less than 1, throttling is disabled
38    * @param bandwidth per cycle(100ms)
39    */
ReplicationThrottler(final double bandwidth)40   public ReplicationThrottler(final double bandwidth) {
41     this.bandwidth = bandwidth;
42     this.enabled = this.bandwidth > 0;
43     if (this.enabled) {
44       this.cyclePushSize = 0;
45       this.cycleStartTick = EnvironmentEdgeManager.currentTime();
46     }
47   }
48 
49   /**
50    * If throttling is enabled
51    * @return true if throttling is enabled
52    */
isEnabled()53   public boolean isEnabled() {
54     return this.enabled;
55   }
56 
57   /**
58    * Get how long the caller should sleep according to the current size and
59    * current cycle's total push size and start tick, return the sleep interval
60    * for throttling control.
61    * @param size is the size of edits to be pushed
62    * @return sleep interval for throttling control
63    */
getNextSleepInterval(final int size)64   public long getNextSleepInterval(final int size) {
65     if (!this.enabled) {
66       return 0;
67     }
68 
69     long sleepTicks = 0;
70     long now = EnvironmentEdgeManager.currentTime();
71     // 1. if cyclePushSize exceeds bandwidth, we need to sleep some
72     //    following cycles to amortize, this case can occur when a single push
73     //    exceeds the bandwidth
74     if ((double)this.cyclePushSize > bandwidth) {
75       double cycles = Math.ceil((double)this.cyclePushSize / bandwidth);
76       long shouldTillTo = this.cycleStartTick + (long)(cycles * 100);
77       if (shouldTillTo > now) {
78         sleepTicks = shouldTillTo - now;
79       } else {
80         // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here!
81         this.cycleStartTick = now;
82       }
83       this.cyclePushSize = 0;
84     } else {
85       long nextCycleTick = this.cycleStartTick + 100;  //a cycle is 100ms
86       if (now >= nextCycleTick) {
87         // 2. switch to next cycle if the current cycle has passed
88         this.cycleStartTick = now;
89         this.cyclePushSize = 0;
90       } else if (this.cyclePushSize > 0 &&
91           (double)(this.cyclePushSize + size) >= bandwidth) {
92         // 3. delay the push to next cycle if exceeds throttling bandwidth.
93         //    enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case
94         //    where a cycle's first push size(currentSize) > bandwidth
95         sleepTicks = nextCycleTick - now;
96         this.cyclePushSize = 0;
97       }
98     }
99     return sleepTicks;
100   }
101 
102   /**
103    * Add current size to the current cycle's total push size
104    * @param size is the current size added to the current cycle's
105    * total push size
106    */
addPushSize(final int size)107   public void addPushSize(final int size) {
108     if (this.enabled) {
109       this.cyclePushSize += size;
110     }
111   }
112 
113   /**
114    * Reset the cycle start tick to NOW
115    */
resetStartTick()116   public void resetStartTick() {
117     if (this.enabled) {
118       this.cycleStartTick = EnvironmentEdgeManager.currentTime();
119     }
120   }
121 }
122