1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 // This code was found and refactored from here: 20 // https://stackoverflow.com/questions/11182192/how-do-i-serve-https-and-http-for-jetty-from-one-port/40076056#40076056 21 22 package org.apache.zookeeper.server.admin; 23 24 import java.io.IOException; 25 import java.net.InetSocketAddress; 26 import java.nio.ByteBuffer; 27 import java.nio.channels.ReadPendingException; 28 import java.nio.channels.WritePendingException; 29 import org.eclipse.jetty.io.Connection; 30 import org.eclipse.jetty.io.EndPoint; 31 import org.eclipse.jetty.util.Callback; 32 33 public class ReadAheadEndpoint implements EndPoint { 34 35 private final EndPoint endPoint; 36 private final ByteBuffer start; 37 private final byte[] bytes; 38 private int leftToRead; 39 private IOException pendingException = null; 40 41 @Override getLocalAddress()42 public InetSocketAddress getLocalAddress() { 43 return endPoint.getLocalAddress(); 44 } 45 @Override getRemoteAddress()46 public InetSocketAddress getRemoteAddress() { 47 return endPoint.getRemoteAddress(); 48 } 49 @Override isOpen()50 public boolean isOpen() { 51 return endPoint.isOpen(); 52 } 53 @Override getCreatedTimeStamp()54 public long getCreatedTimeStamp() { 55 return endPoint.getCreatedTimeStamp(); 56 } 57 @Override isOutputShutdown()58 public boolean isOutputShutdown() { 59 return endPoint.isOutputShutdown(); 60 } 61 @Override isInputShutdown()62 public boolean isInputShutdown() { 63 return endPoint.isInputShutdown(); 64 } 65 @Override shutdownOutput()66 public void shutdownOutput() { 67 endPoint.shutdownOutput(); 68 } 69 @Override close()70 public void close() { 71 endPoint.close(); 72 } 73 @Override getTransport()74 public Object getTransport() { 75 return endPoint.getTransport(); 76 } 77 @Override getIdleTimeout()78 public long getIdleTimeout() { 79 return endPoint.getIdleTimeout(); 80 } 81 @Override getConnection()82 public Connection getConnection() { 83 return endPoint.getConnection(); 84 } 85 @Override onOpen()86 public void onOpen() { 87 endPoint.onOpen(); 88 } 89 @Override onClose()90 public void onClose() { 91 endPoint.onClose(); 92 } 93 @Override isOptimizedForDirectBuffers()94 public boolean isOptimizedForDirectBuffers() { 95 return endPoint.isOptimizedForDirectBuffers(); 96 } 97 @Override isFillInterested()98 public boolean isFillInterested() { 99 return endPoint.isFillInterested(); 100 } 101 @Override tryFillInterested(Callback v)102 public boolean tryFillInterested(Callback v) { 103 return endPoint.tryFillInterested(v); 104 } 105 @Override flush(ByteBuffer... v)106 public boolean flush(ByteBuffer... v) throws IOException { 107 return endPoint.flush(v); 108 } 109 @Override setIdleTimeout(long v)110 public void setIdleTimeout(long v) { 111 endPoint.setIdleTimeout(v); 112 } 113 @Override write(Callback v, ByteBuffer... b)114 public void write(Callback v, ByteBuffer... b) throws WritePendingException { 115 endPoint.write(v, b); 116 } 117 @Override setConnection(Connection v)118 public void setConnection(Connection v) { 119 endPoint.setConnection(v); 120 } 121 @Override upgrade(Connection v)122 public void upgrade(Connection v) { 123 endPoint.upgrade(v); 124 } 125 @Override fillInterested(Callback v)126 public void fillInterested(Callback v) throws ReadPendingException { 127 endPoint.fillInterested(v); 128 } 129 ReadAheadEndpoint(final EndPoint channel, final int readAheadLength)130 public ReadAheadEndpoint(final EndPoint channel, final int readAheadLength) { 131 if (channel == null) { 132 throw new IllegalArgumentException("channel cannot be null"); 133 } 134 135 this.endPoint = channel; 136 start = ByteBuffer.wrap(bytes = new byte[readAheadLength]); 137 start.flip(); 138 leftToRead = readAheadLength; 139 } 140 readAhead()141 private synchronized void readAhead() throws IOException { 142 if (leftToRead > 0) { 143 int n = 0; 144 do { 145 n = endPoint.fill(start); 146 } while (n == 0 && endPoint.isOpen() && !endPoint.isInputShutdown()); 147 if (n == -1) { 148 leftToRead = -1; 149 } else { 150 leftToRead -= n; 151 } 152 if (leftToRead <= 0) { 153 start.rewind(); 154 } 155 } 156 } 157 readFromStart(final ByteBuffer dst)158 private int readFromStart(final ByteBuffer dst) throws IOException { 159 final int n = Math.min(dst.remaining(), start.remaining()); 160 if (n > 0) { 161 dst.put(bytes, start.position(), n); 162 start.position(start.position() + n); 163 dst.flip(); 164 } 165 return n; 166 } 167 168 @Override fill(final ByteBuffer dst)169 public synchronized int fill(final ByteBuffer dst) throws IOException { 170 throwPendingException(); 171 if (leftToRead > 0) { 172 readAhead(); 173 } 174 if (leftToRead > 0) { 175 return 0; 176 } 177 final int sr = start.remaining(); 178 if (sr > 0) { 179 dst.compact(); 180 final int n = readFromStart(dst); 181 if (n < sr) { 182 return n; 183 } 184 } 185 return sr + endPoint.fill(dst); 186 } 187 getBytes()188 public byte[] getBytes() { 189 if (pendingException == null) { 190 try { 191 readAhead(); 192 } catch (IOException e) { 193 pendingException = e; 194 } 195 } 196 byte[] ret = new byte[bytes.length]; 197 System.arraycopy(bytes, 0, ret, 0, ret.length); 198 return ret; 199 } 200 throwPendingException()201 private void throwPendingException() throws IOException { 202 if (pendingException != null) { 203 IOException e = pendingException; 204 pendingException = null; 205 throw e; 206 } 207 } 208 209 } 210