1 /*
2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http.websocket;
27 
28 import jdk.internal.net.http.common.Logger;
29 import jdk.internal.net.http.common.Utils;
30 import jdk.internal.net.http.websocket.Frame.Opcode;
31 
32 import java.io.IOException;
33 import java.nio.ByteBuffer;
34 import java.nio.CharBuffer;
35 import java.nio.charset.CharacterCodingException;
36 import java.nio.charset.CharsetEncoder;
37 import java.nio.charset.CoderResult;
38 import java.nio.charset.CodingErrorAction;
39 import java.nio.charset.StandardCharsets;
40 import java.security.SecureRandom;
41 
42 /*
43  * A stateful producer of binary representations of WebSocket messages being
44  * sent from the client to the server.
45  *
46  * An encoding method is given an original message and a byte buffer to put the
47  * resulting bytes to. The method is called until it returns true. Then the
48  * reset method is called. The whole sequence repeats with next message.
49  */
50 public class MessageEncoder {
51 
52     private static final Logger debug =
53             Utils.getWebSocketLogger("[Output]"::toString, Utils.DEBUG_WS);
54 
55     private final SecureRandom maskingKeySource = new SecureRandom();
56     private final Frame.HeaderWriter headerWriter = new Frame.HeaderWriter();
57     private final Frame.Masker payloadMasker = new Frame.Masker();
58     private final CharsetEncoder charsetEncoder
59             = StandardCharsets.UTF_8.newEncoder()
60                                     .onMalformedInput(CodingErrorAction.REPORT)
61                                     .onUnmappableCharacter(CodingErrorAction.REPORT);
62     /*
63      * This buffer is used both to encode characters to UTF-8 and to calculate
64      * the length of the resulting frame's payload. The length of the payload
65      * must be known before the frame's header can be written.
66      * For implementation reasons, this buffer must have a capacity of at least
67      * the maximum size of a Close frame payload, which is 125 bytes
68      * (or Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH).
69      */
70     private final ByteBuffer intermediateBuffer = createIntermediateBuffer(
71             Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH);
72     private final ByteBuffer headerBuffer = ByteBuffer.allocate(
73             Frame.MAX_HEADER_SIZE_BYTES);
74 
75     private boolean started;
76     private boolean flushing;
77     private boolean moreText = true;
78     private long headerCount;
79     /* Has the previous frame got its fin flag set? */
80     private boolean previousFin = true;
81     /* Was the previous frame TEXT or a CONTINUATION thereof? */
82     private boolean previousText;
83     private boolean closed;
84 
85     /*
86      * How many bytes of the current message have been already encoded.
87      *
88      * Even though the user hands their buffers over to us, they still can
89      * manipulate these buffers while we are getting data out of them.
90      * The number of produced bytes guards us from such behaviour in the
91      * case of messages that must be restricted in size (Ping, Pong and Close).
92      * For other messages this measure provides a best-effort attempt to detect
93      * concurrent changes to buffer.
94      *
95      * Making a shallow copy (duplicate/slice) and then checking the size
96      * precondition on it would also solve the problem, but at the cost of this
97      * extra copy.
98      */
99     private int actualLen;
100 
101     /*
102      * How many bytes were originally there in the message, before the encoding
103      * started.
104      */
105     private int expectedLen;
106 
107     /* Exposed for testing purposes */
createIntermediateBuffer(int minSize)108     protected ByteBuffer createIntermediateBuffer(int minSize) {
109         int capacity = Utils.getIntegerNetProperty(
110                 "jdk.httpclient.websocket.intermediateBufferSize", 16384);
111         return ByteBuffer.allocate(Math.max(minSize, capacity));
112     }
113 
reset()114     public void reset() {
115         // Do not reset the message stream state fields, e.g. previousFin,
116         // previousText. Just an individual message state:
117         started = false;
118         flushing = false;
119         moreText = true;
120         headerCount = 0;
121         actualLen = 0;
122     }
123 
124     /*
125      * Encodes text messages by cutting them into fragments of maximum size of
126      * intermediateBuffer.capacity()
127      */
encodeText(CharBuffer src, boolean last, ByteBuffer dst)128     public boolean encodeText(CharBuffer src, boolean last, ByteBuffer dst)
129             throws IOException
130     {
131         if (debug.on()) {
132             debug.log("encode text src=[pos=%s lim=%s cap=%s] last=%s dst=%s",
133                       src.position(), src.limit(), src.capacity(), last, dst);
134         }
135         if (closed) {
136             throw new IOException("Output closed");
137         }
138         if (!started) {
139             if (!previousText && !previousFin) {
140                 // Previous data message was a partial binary message
141                 throw new IllegalStateException("Unexpected text message");
142             }
143             started = true;
144             headerBuffer.position(0).limit(0);
145             intermediateBuffer.position(0).limit(0);
146             charsetEncoder.reset();
147         }
148         while (true) {
149             if (debug.on()) {
150                 debug.log("put");
151             }
152             if (!putAvailable(headerBuffer, dst)) {
153                 return false;
154             }
155             if (debug.on()) {
156                 debug.log("mask");
157             }
158             if (maskAvailable(intermediateBuffer, dst) < 0) {
159                 return false;
160             }
161             if (debug.on()) {
162                 debug.log("moreText");
163             }
164             if (!moreText) {
165                 previousFin = last;
166                 previousText = true;
167                 return true;
168             }
169             intermediateBuffer.clear();
170             CoderResult r = null;
171             if (!flushing) {
172                 r = charsetEncoder.encode(src, intermediateBuffer, true);
173                 if (r.isUnderflow()) {
174                     flushing = true;
175                 }
176             }
177             if (flushing) {
178                 r = charsetEncoder.flush(intermediateBuffer);
179                 if (r.isUnderflow()) {
180                     moreText = false;
181                 }
182             }
183             if (r.isError()) {
184                 try {
185                     r.throwException();
186                 } catch (CharacterCodingException e) {
187                     throw new IOException("Malformed text message", e);
188                 }
189             }
190             if (debug.on()) {
191                 debug.log("frame #%s", headerCount);
192             }
193             intermediateBuffer.flip();
194             Opcode opcode = previousFin && headerCount == 0
195                     ? Opcode.TEXT : Opcode.CONTINUATION;
196             boolean fin = last && !moreText;
197             setupHeader(opcode, fin, intermediateBuffer.remaining());
198             headerCount++;
199         }
200     }
201 
putAvailable(ByteBuffer src, ByteBuffer dst)202     private boolean putAvailable(ByteBuffer src, ByteBuffer dst) {
203         int available = dst.remaining();
204         if (available >= src.remaining()) {
205             dst.put(src);
206             return true;
207         } else {
208             int lim = src.limit();                   // save the limit
209             src.limit(src.position() + available);
210             dst.put(src);
211             src.limit(lim);                          // restore the limit
212             return false;
213         }
214     }
215 
encodeBinary(ByteBuffer src, boolean last, ByteBuffer dst)216     public boolean encodeBinary(ByteBuffer src, boolean last, ByteBuffer dst)
217             throws IOException
218     {
219         if (debug.on()) {
220             debug.log("encode binary src=%s last=%s dst=%s",
221                       src, last, dst);
222         }
223         if (closed) {
224             throw new IOException("Output closed");
225         }
226         if (!started) {
227             if (previousText && !previousFin) {
228                 // Previous data message was a partial text message
229                 throw new IllegalStateException("Unexpected binary message");
230             }
231             expectedLen = src.remaining();
232             Opcode opcode = previousFin ? Opcode.BINARY : Opcode.CONTINUATION;
233             setupHeader(opcode, last, expectedLen);
234             previousFin = last;
235             previousText = false;
236             started = true;
237         }
238         if (!putAvailable(headerBuffer, dst)) {
239             return false;
240         }
241         int count = maskAvailable(src, dst);
242         actualLen += Math.abs(count);
243         if (count >= 0 && actualLen != expectedLen) {
244             throw new IOException("Concurrent message modification");
245         }
246         return count >= 0;
247     }
248 
maskAvailable(ByteBuffer src, ByteBuffer dst)249     private int maskAvailable(ByteBuffer src, ByteBuffer dst) {
250         int r0 = dst.remaining();
251         payloadMasker.transferMasking(src, dst);
252         int masked = r0 - dst.remaining();
253         return src.hasRemaining() ? -masked : masked;
254     }
255 
encodePing(ByteBuffer src, ByteBuffer dst)256     public boolean encodePing(ByteBuffer src, ByteBuffer dst)
257             throws IOException
258     {
259         if (debug.on()) {
260             debug.log("encode ping src=%s dst=%s", src, dst);
261         }
262         if (closed) {
263             throw new IOException("Output closed");
264         }
265         if (!started) {
266             expectedLen = src.remaining();
267             if (expectedLen > Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH) {
268                 throw new IllegalArgumentException("Long message: " + expectedLen);
269             }
270             setupHeader(Opcode.PING, true, expectedLen);
271             started = true;
272         }
273         if (!putAvailable(headerBuffer, dst)) {
274             return false;
275         }
276         int count = maskAvailable(src, dst);
277         actualLen += Math.abs(count);
278         if (count >= 0 && actualLen != expectedLen) {
279             throw new IOException("Concurrent message modification");
280         }
281         return count >= 0;
282     }
283 
encodePong(ByteBuffer src, ByteBuffer dst)284     public boolean encodePong(ByteBuffer src, ByteBuffer dst)
285             throws IOException
286     {
287         if (debug.on()) {
288             debug.log("encode pong src=%s dst=%s",
289                       src, dst);
290         }
291         if (closed) {
292             throw new IOException("Output closed");
293         }
294         if (!started) {
295             expectedLen = src.remaining();
296             if (expectedLen > Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH) {
297                 throw new IllegalArgumentException("Long message: " + expectedLen);
298             }
299             setupHeader(Opcode.PONG, true, expectedLen);
300             started = true;
301         }
302         if (!putAvailable(headerBuffer, dst)) {
303             return false;
304         }
305         int count = maskAvailable(src, dst);
306         actualLen += Math.abs(count);
307         if (count >= 0 && actualLen != expectedLen) {
308             throw new IOException("Concurrent message modification");
309         }
310         return count >= 0;
311     }
312 
encodeClose(int statusCode, CharBuffer reason, ByteBuffer dst)313     public boolean encodeClose(int statusCode, CharBuffer reason, ByteBuffer dst)
314             throws IOException
315     {
316         if (debug.on()) {
317             debug.log("encode close statusCode=%s reason=[pos=%s lim=%s cap=%s] dst=%s",
318                       statusCode, reason.position(), reason.limit(), reason.capacity(), dst);
319         }
320         if (closed) {
321             throw new IOException("Output closed");
322         }
323         if (!started) {
324             if (debug.on()) {
325                 debug.log("reason [pos=%s lim=%s cap=%s]",
326                           reason.position(), reason.limit(), reason.capacity());
327             }
328             intermediateBuffer.position(0).limit(Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH);
329             intermediateBuffer.putChar((char) statusCode);
330             CoderResult r = charsetEncoder.reset().encode(reason, intermediateBuffer, true);
331             if (r.isUnderflow()) {
332                 if (debug.on()) {
333                     debug.log("flushing");
334                 }
335                 r = charsetEncoder.flush(intermediateBuffer);
336             }
337             if (debug.on()) {
338                 debug.log("encoding result: %s", r);
339             }
340             if (r.isError()) {
341                 try {
342                     r.throwException();
343                 } catch (CharacterCodingException e) {
344                     throw new IOException("Malformed reason", e);
345                 }
346             } else if (r.isOverflow()) {
347                 // Here the 125 bytes size is ensured by the check for overflow
348                 throw new IOException("Long reason");
349             } else if (!r.isUnderflow()) {
350                 throw new InternalError(); // assertion
351             }
352             intermediateBuffer.flip();
353             setupHeader(Opcode.CLOSE, true, intermediateBuffer.remaining());
354             started = true;
355             closed = true;
356             if (debug.on()) {
357                 debug.log("intermediateBuffer=%s", intermediateBuffer);
358             }
359         }
360         if (!putAvailable(headerBuffer, dst)) {
361             return false;
362         }
363         return maskAvailable(intermediateBuffer, dst) >= 0;
364     }
365 
setupHeader(Opcode opcode, boolean fin, long payloadLen)366     private void setupHeader(Opcode opcode, boolean fin, long payloadLen) {
367         if (debug.on()) {
368             debug.log("frame opcode=%s fin=%s len=%s",
369                       opcode, fin, payloadLen);
370         }
371         headerBuffer.clear();
372         int mask = maskingKeySource.nextInt();
373         headerWriter.fin(fin)
374                     .opcode(opcode)
375                     .payloadLen(payloadLen)
376                     .mask(mask)
377                     .write(headerBuffer);
378         headerBuffer.flip();
379         payloadMasker.mask(mask);
380     }
381 }
382