1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.zookeeper.server.controller; 20 21 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 22 import java.io.IOException; 23 import java.nio.channels.SelectionKey; 24 import java.nio.channels.SocketChannel; 25 import org.apache.zookeeper.server.NIOServerCnxn; 26 import org.apache.zookeeper.server.NIOServerCnxnFactory; 27 import org.slf4j.Logger; 28 import org.slf4j.LoggerFactory; 29 30 /** 31 * Extension of NIOServerCnxnFactory which can inject changes per controller commands. 32 * Similar extensions can implement on top of NettyServerCnxnFactory as well. 33 */ 34 @SuppressFBWarnings(value = "SWL_SLEEP_WITH_LOCK_HELD", justification = "no dead lock") 35 public class ControllableConnectionFactory extends NIOServerCnxnFactory { 36 private static final Logger LOG = LoggerFactory.getLogger(ControllableConnectionFactory.class); 37 private long responseDelayInMs = 0; 38 private long remainingRequestsToFail = 0; 39 private long remainingResponsesToHold = 0; 40 ControllableConnectionFactory()41 public ControllableConnectionFactory() { 42 } 43 44 @Override createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread)45 protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) 46 throws IOException { 47 return new ControllableConnection(zkServer, sock, sk, this, selectorThread); 48 } 49 50 /** 51 * Called by the connection to delay processing requests from the client. 52 */ delayRequestIfNeeded()53 public synchronized void delayRequestIfNeeded() { 54 try { 55 if (responseDelayInMs > 0) { 56 Thread.sleep(responseDelayInMs); 57 } 58 } catch (InterruptedException ex) { 59 LOG.warn("Interrupted while delaying requests", ex); 60 } 61 } 62 63 /** 64 * Check if we should fail the next incoming request. 65 * If so, decrement the remaining requests to fail. 66 */ shouldFailNextRequest()67 public synchronized boolean shouldFailNextRequest() { 68 if (remainingRequestsToFail == 0) { 69 return false; 70 } 71 72 // Value < 0 indicates fail all requests. 73 if (remainingRequestsToFail > 0) { 74 remainingRequestsToFail--; 75 } 76 77 return true; 78 } 79 80 /** 81 * Check if we should send a response to the latest processed request (true), 82 * or eat the response to mess with the client (false). 83 * If so, decrement the remaining requests to eat. 84 */ shouldSendResponse()85 public synchronized boolean shouldSendResponse() { 86 if (remainingResponsesToHold == 0) { 87 return true; 88 } 89 90 // Value < 0 indicates hold all the responses. 91 if (remainingResponsesToHold > 0) { 92 remainingResponsesToHold--; 93 } 94 return false; 95 } 96 delayResponses(long delayInMs)97 public synchronized void delayResponses(long delayInMs) { 98 if (delayInMs < 0) { 99 throw new IllegalArgumentException("delay must be non-negative"); 100 } 101 responseDelayInMs = delayInMs; 102 } 103 resetBadBehavior()104 public synchronized void resetBadBehavior() { 105 responseDelayInMs = 0; 106 remainingRequestsToFail = 0; 107 remainingResponsesToHold = 0; 108 } 109 failAllFutureRequests()110 public synchronized void failAllFutureRequests() { 111 this.remainingRequestsToFail = -1; 112 } 113 failFutureRequests(long requestsToFail)114 public synchronized void failFutureRequests(long requestsToFail) { 115 this.remainingRequestsToFail = requestsToFail; 116 } 117 holdAllFutureResponses()118 public synchronized void holdAllFutureResponses() { 119 this.remainingResponsesToHold = -1; 120 } 121 holdFutureResponses(long requestsToHold)122 public synchronized void holdFutureResponses(long requestsToHold) { 123 this.remainingResponsesToHold = requestsToHold; 124 } 125 } 126