1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.zookeeper.server.controller;
20 
21 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.SocketChannel;
25 import org.apache.zookeeper.server.NIOServerCnxn;
26 import org.apache.zookeeper.server.NIOServerCnxnFactory;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 
30 /**
31  * Extension of NIOServerCnxnFactory which can inject changes per controller commands.
32  * Similar extensions can implement on top of NettyServerCnxnFactory as well.
33  */
34 @SuppressFBWarnings(value = "SWL_SLEEP_WITH_LOCK_HELD", justification = "no dead lock")
35 public class ControllableConnectionFactory extends NIOServerCnxnFactory {
36     private static final Logger LOG = LoggerFactory.getLogger(ControllableConnectionFactory.class);
37     private long responseDelayInMs = 0;
38     private long remainingRequestsToFail = 0;
39     private long remainingResponsesToHold = 0;
40 
ControllableConnectionFactory()41     public ControllableConnectionFactory() {
42     }
43 
44     @Override
createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread)45     protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread)
46             throws IOException {
47         return new ControllableConnection(zkServer, sock, sk, this, selectorThread);
48     }
49 
50     /**
51      * Called by the connection to delay processing requests from the client.
52     */
delayRequestIfNeeded()53     public synchronized void delayRequestIfNeeded() {
54         try {
55             if (responseDelayInMs > 0) {
56                 Thread.sleep(responseDelayInMs);
57             }
58         } catch (InterruptedException ex) {
59             LOG.warn("Interrupted while delaying requests", ex);
60         }
61     }
62 
63     /**
64      * Check if we should fail the next incoming request.
65      * If so, decrement the remaining requests to fail.
66     */
shouldFailNextRequest()67     public synchronized boolean shouldFailNextRequest() {
68         if (remainingRequestsToFail == 0) {
69             return false;
70         }
71 
72         // Value < 0 indicates fail all requests.
73         if (remainingRequestsToFail > 0) {
74             remainingRequestsToFail--;
75         }
76 
77         return true;
78     }
79 
80     /**
81      * Check if we should send a response to the latest processed request (true),
82      * or eat the response to mess with the client (false).
83      * If so, decrement the remaining requests to eat.
84     */
shouldSendResponse()85     public synchronized boolean shouldSendResponse() {
86         if (remainingResponsesToHold == 0) {
87             return true;
88         }
89 
90         // Value < 0 indicates hold all the responses.
91         if (remainingResponsesToHold > 0) {
92             remainingResponsesToHold--;
93         }
94         return false;
95     }
96 
delayResponses(long delayInMs)97     public synchronized void delayResponses(long delayInMs) {
98         if (delayInMs < 0) {
99             throw new IllegalArgumentException("delay must be non-negative");
100         }
101         responseDelayInMs = delayInMs;
102     }
103 
resetBadBehavior()104     public synchronized void resetBadBehavior() {
105         responseDelayInMs = 0;
106         remainingRequestsToFail = 0;
107         remainingResponsesToHold = 0;
108     }
109 
failAllFutureRequests()110     public synchronized void failAllFutureRequests() {
111         this.remainingRequestsToFail = -1;
112     }
113 
failFutureRequests(long requestsToFail)114     public synchronized void failFutureRequests(long requestsToFail) {
115         this.remainingRequestsToFail = requestsToFail;
116     }
117 
holdAllFutureResponses()118     public synchronized void holdAllFutureResponses() {
119         this.remainingResponsesToHold = -1;
120     }
121 
holdFutureResponses(long requestsToHold)122     public synchronized void holdFutureResponses(long requestsToHold) {
123         this.remainingResponsesToHold = requestsToHold;
124     }
125 }
126