1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 // This code was found and refactored from here:
20 // https://stackoverflow.com/questions/11182192/how-do-i-serve-https-and-http-for-jetty-from-one-port/40076056#40076056
21 
22 package org.apache.zookeeper.server.admin;
23 
24 import java.io.IOException;
25 import java.net.InetSocketAddress;
26 import java.nio.ByteBuffer;
27 import java.nio.channels.ReadPendingException;
28 import java.nio.channels.WritePendingException;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.util.Callback;
32 
33 public class ReadAheadEndpoint implements EndPoint {
34 
35     private final EndPoint endPoint;
36     private final ByteBuffer start;
37     private final byte[] bytes;
38     private int leftToRead;
39     private IOException pendingException = null;
40 
41     @Override
getLocalAddress()42     public InetSocketAddress getLocalAddress() {
43         return endPoint.getLocalAddress();
44     }
45     @Override
getRemoteAddress()46     public InetSocketAddress getRemoteAddress() {
47         return endPoint.getRemoteAddress();
48     }
49     @Override
isOpen()50     public boolean isOpen() {
51         return endPoint.isOpen();
52     }
53     @Override
getCreatedTimeStamp()54     public long getCreatedTimeStamp() {
55         return endPoint.getCreatedTimeStamp();
56     }
57     @Override
isOutputShutdown()58     public boolean isOutputShutdown() {
59         return endPoint.isOutputShutdown();
60     }
61     @Override
isInputShutdown()62     public boolean isInputShutdown() {
63         return endPoint.isInputShutdown();
64     }
65     @Override
shutdownOutput()66     public void shutdownOutput() {
67         endPoint.shutdownOutput();
68     }
69     @Override
close()70     public void close() {
71         endPoint.close();
72     }
73     @Override
getTransport()74     public Object getTransport() {
75         return endPoint.getTransport();
76     }
77     @Override
getIdleTimeout()78     public long getIdleTimeout() {
79         return endPoint.getIdleTimeout();
80     }
81     @Override
getConnection()82     public Connection getConnection() {
83         return endPoint.getConnection();
84     }
85     @Override
onOpen()86     public void onOpen() {
87         endPoint.onOpen();
88     }
89     @Override
onClose()90     public void onClose() {
91         endPoint.onClose();
92     }
93     @Override
isOptimizedForDirectBuffers()94     public boolean isOptimizedForDirectBuffers() {
95         return endPoint.isOptimizedForDirectBuffers();
96     }
97     @Override
isFillInterested()98     public boolean isFillInterested() {
99         return endPoint.isFillInterested();
100     }
101     @Override
tryFillInterested(Callback v)102     public boolean tryFillInterested(Callback v) {
103         return endPoint.tryFillInterested(v);
104     }
105     @Override
flush(ByteBuffer... v)106     public boolean flush(ByteBuffer... v) throws IOException {
107         return endPoint.flush(v);
108     }
109     @Override
setIdleTimeout(long v)110     public void setIdleTimeout(long v) {
111         endPoint.setIdleTimeout(v);
112     }
113     @Override
write(Callback v, ByteBuffer... b)114     public void write(Callback v, ByteBuffer... b) throws WritePendingException {
115         endPoint.write(v, b);
116     }
117     @Override
setConnection(Connection v)118     public void setConnection(Connection v) {
119         endPoint.setConnection(v);
120     }
121     @Override
upgrade(Connection v)122     public void upgrade(Connection v) {
123         endPoint.upgrade(v);
124     }
125     @Override
fillInterested(Callback v)126     public void fillInterested(Callback v) throws ReadPendingException {
127         endPoint.fillInterested(v);
128     }
129 
ReadAheadEndpoint(final EndPoint channel, final int readAheadLength)130     public ReadAheadEndpoint(final EndPoint channel, final int readAheadLength) {
131         if (channel == null) {
132             throw new IllegalArgumentException("channel cannot be null");
133         }
134 
135         this.endPoint = channel;
136         start = ByteBuffer.wrap(bytes = new byte[readAheadLength]);
137         start.flip();
138         leftToRead = readAheadLength;
139     }
140 
readAhead()141     private synchronized void readAhead() throws IOException {
142         if (leftToRead > 0) {
143             int n = 0;
144             do {
145                 n = endPoint.fill(start);
146             } while (n == 0 && endPoint.isOpen() && !endPoint.isInputShutdown());
147             if (n == -1) {
148                 leftToRead = -1;
149             } else {
150                 leftToRead -= n;
151             }
152             if (leftToRead <= 0) {
153                 start.rewind();
154             }
155         }
156     }
157 
readFromStart(final ByteBuffer dst)158     private int readFromStart(final ByteBuffer dst) throws IOException {
159         final int n = Math.min(dst.remaining(), start.remaining());
160         if (n > 0) {
161             dst.put(bytes, start.position(), n);
162             start.position(start.position() + n);
163             dst.flip();
164         }
165         return n;
166     }
167 
168     @Override
fill(final ByteBuffer dst)169     public synchronized int fill(final ByteBuffer dst) throws IOException {
170         throwPendingException();
171         if (leftToRead > 0) {
172             readAhead();
173         }
174         if (leftToRead > 0) {
175             return 0;
176         }
177         final int sr = start.remaining();
178         if (sr > 0) {
179             dst.compact();
180             final int n = readFromStart(dst);
181             if (n < sr) {
182                 return n;
183             }
184         }
185         return sr + endPoint.fill(dst);
186     }
187 
getBytes()188     public byte[] getBytes() {
189         if (pendingException == null) {
190             try {
191                 readAhead();
192             } catch (IOException e) {
193                 pendingException = e;
194             }
195         }
196         byte[] ret = new byte[bytes.length];
197         System.arraycopy(bytes, 0, ret, 0, ret.length);
198         return ret;
199     }
200 
throwPendingException()201     private void throwPendingException() throws IOException {
202         if (pendingException != null) {
203             IOException e = pendingException;
204             pendingException = null;
205             throw e;
206         }
207     }
208 
209 }
210