1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http.websocket; 27 28 import jdk.internal.net.http.common.Logger; 29 import jdk.internal.net.http.common.Utils; 30 import jdk.internal.net.http.websocket.Frame.Opcode; 31 32 import java.io.IOException; 33 import java.nio.ByteBuffer; 34 import java.nio.CharBuffer; 35 import java.nio.charset.CharacterCodingException; 36 import java.nio.charset.CharsetEncoder; 37 import java.nio.charset.CoderResult; 38 import java.nio.charset.CodingErrorAction; 39 import java.nio.charset.StandardCharsets; 40 import java.security.SecureRandom; 41 42 /* 43 * A stateful producer of binary representations of WebSocket messages being 44 * sent from the client to the server. 45 * 46 * An encoding method is given an original message and a byte buffer to put the 47 * resulting bytes to. The method is called until it returns true. Then the 48 * reset method is called. The whole sequence repeats with next message. 49 */ 50 public class MessageEncoder { 51 52 private static final Logger debug = 53 Utils.getWebSocketLogger("[Output]"::toString, Utils.DEBUG_WS); 54 55 private final SecureRandom maskingKeySource = new SecureRandom(); 56 private final Frame.HeaderWriter headerWriter = new Frame.HeaderWriter(); 57 private final Frame.Masker payloadMasker = new Frame.Masker(); 58 private final CharsetEncoder charsetEncoder 59 = StandardCharsets.UTF_8.newEncoder() 60 .onMalformedInput(CodingErrorAction.REPORT) 61 .onUnmappableCharacter(CodingErrorAction.REPORT); 62 /* 63 * This buffer is used both to encode characters to UTF-8 and to calculate 64 * the length of the resulting frame's payload. The length of the payload 65 * must be known before the frame's header can be written. 66 * For implementation reasons, this buffer must have a capacity of at least 67 * the maximum size of a Close frame payload, which is 125 bytes 68 * (or Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH). 69 */ 70 private final ByteBuffer intermediateBuffer = createIntermediateBuffer( 71 Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH); 72 private final ByteBuffer headerBuffer = ByteBuffer.allocate( 73 Frame.MAX_HEADER_SIZE_BYTES); 74 75 private boolean started; 76 private boolean flushing; 77 private boolean moreText = true; 78 private long headerCount; 79 /* Has the previous frame got its fin flag set? */ 80 private boolean previousFin = true; 81 /* Was the previous frame TEXT or a CONTINUATION thereof? */ 82 private boolean previousText; 83 private boolean closed; 84 85 /* 86 * How many bytes of the current message have been already encoded. 87 * 88 * Even though the user hands their buffers over to us, they still can 89 * manipulate these buffers while we are getting data out of them. 90 * The number of produced bytes guards us from such behaviour in the 91 * case of messages that must be restricted in size (Ping, Pong and Close). 92 * For other messages this measure provides a best-effort attempt to detect 93 * concurrent changes to buffer. 94 * 95 * Making a shallow copy (duplicate/slice) and then checking the size 96 * precondition on it would also solve the problem, but at the cost of this 97 * extra copy. 98 */ 99 private int actualLen; 100 101 /* 102 * How many bytes were originally there in the message, before the encoding 103 * started. 104 */ 105 private int expectedLen; 106 107 /* Exposed for testing purposes */ createIntermediateBuffer(int minSize)108 protected ByteBuffer createIntermediateBuffer(int minSize) { 109 int capacity = Utils.getIntegerNetProperty( 110 "jdk.httpclient.websocket.intermediateBufferSize", 16384); 111 return ByteBuffer.allocate(Math.max(minSize, capacity)); 112 } 113 reset()114 public void reset() { 115 // Do not reset the message stream state fields, e.g. previousFin, 116 // previousText. Just an individual message state: 117 started = false; 118 flushing = false; 119 moreText = true; 120 headerCount = 0; 121 actualLen = 0; 122 } 123 124 /* 125 * Encodes text messages by cutting them into fragments of maximum size of 126 * intermediateBuffer.capacity() 127 */ encodeText(CharBuffer src, boolean last, ByteBuffer dst)128 public boolean encodeText(CharBuffer src, boolean last, ByteBuffer dst) 129 throws IOException 130 { 131 if (debug.on()) { 132 debug.log("encode text src=[pos=%s lim=%s cap=%s] last=%s dst=%s", 133 src.position(), src.limit(), src.capacity(), last, dst); 134 } 135 if (closed) { 136 throw new IOException("Output closed"); 137 } 138 if (!started) { 139 if (!previousText && !previousFin) { 140 // Previous data message was a partial binary message 141 throw new IllegalStateException("Unexpected text message"); 142 } 143 started = true; 144 headerBuffer.position(0).limit(0); 145 intermediateBuffer.position(0).limit(0); 146 charsetEncoder.reset(); 147 } 148 while (true) { 149 if (debug.on()) { 150 debug.log("put"); 151 } 152 if (!putAvailable(headerBuffer, dst)) { 153 return false; 154 } 155 if (debug.on()) { 156 debug.log("mask"); 157 } 158 if (maskAvailable(intermediateBuffer, dst) < 0) { 159 return false; 160 } 161 if (debug.on()) { 162 debug.log("moreText"); 163 } 164 if (!moreText) { 165 previousFin = last; 166 previousText = true; 167 return true; 168 } 169 intermediateBuffer.clear(); 170 CoderResult r = null; 171 if (!flushing) { 172 r = charsetEncoder.encode(src, intermediateBuffer, true); 173 if (r.isUnderflow()) { 174 flushing = true; 175 } 176 } 177 if (flushing) { 178 r = charsetEncoder.flush(intermediateBuffer); 179 if (r.isUnderflow()) { 180 moreText = false; 181 } 182 } 183 if (r.isError()) { 184 try { 185 r.throwException(); 186 } catch (CharacterCodingException e) { 187 throw new IOException("Malformed text message", e); 188 } 189 } 190 if (debug.on()) { 191 debug.log("frame #%s", headerCount); 192 } 193 intermediateBuffer.flip(); 194 Opcode opcode = previousFin && headerCount == 0 195 ? Opcode.TEXT : Opcode.CONTINUATION; 196 boolean fin = last && !moreText; 197 setupHeader(opcode, fin, intermediateBuffer.remaining()); 198 headerCount++; 199 } 200 } 201 putAvailable(ByteBuffer src, ByteBuffer dst)202 private boolean putAvailable(ByteBuffer src, ByteBuffer dst) { 203 int available = dst.remaining(); 204 if (available >= src.remaining()) { 205 dst.put(src); 206 return true; 207 } else { 208 int lim = src.limit(); // save the limit 209 src.limit(src.position() + available); 210 dst.put(src); 211 src.limit(lim); // restore the limit 212 return false; 213 } 214 } 215 encodeBinary(ByteBuffer src, boolean last, ByteBuffer dst)216 public boolean encodeBinary(ByteBuffer src, boolean last, ByteBuffer dst) 217 throws IOException 218 { 219 if (debug.on()) { 220 debug.log("encode binary src=%s last=%s dst=%s", 221 src, last, dst); 222 } 223 if (closed) { 224 throw new IOException("Output closed"); 225 } 226 if (!started) { 227 if (previousText && !previousFin) { 228 // Previous data message was a partial text message 229 throw new IllegalStateException("Unexpected binary message"); 230 } 231 expectedLen = src.remaining(); 232 Opcode opcode = previousFin ? Opcode.BINARY : Opcode.CONTINUATION; 233 setupHeader(opcode, last, expectedLen); 234 previousFin = last; 235 previousText = false; 236 started = true; 237 } 238 if (!putAvailable(headerBuffer, dst)) { 239 return false; 240 } 241 int count = maskAvailable(src, dst); 242 actualLen += Math.abs(count); 243 if (count >= 0 && actualLen != expectedLen) { 244 throw new IOException("Concurrent message modification"); 245 } 246 return count >= 0; 247 } 248 maskAvailable(ByteBuffer src, ByteBuffer dst)249 private int maskAvailable(ByteBuffer src, ByteBuffer dst) { 250 int r0 = dst.remaining(); 251 payloadMasker.transferMasking(src, dst); 252 int masked = r0 - dst.remaining(); 253 return src.hasRemaining() ? -masked : masked; 254 } 255 encodePing(ByteBuffer src, ByteBuffer dst)256 public boolean encodePing(ByteBuffer src, ByteBuffer dst) 257 throws IOException 258 { 259 if (debug.on()) { 260 debug.log("encode ping src=%s dst=%s", src, dst); 261 } 262 if (closed) { 263 throw new IOException("Output closed"); 264 } 265 if (!started) { 266 expectedLen = src.remaining(); 267 if (expectedLen > Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH) { 268 throw new IllegalArgumentException("Long message: " + expectedLen); 269 } 270 setupHeader(Opcode.PING, true, expectedLen); 271 started = true; 272 } 273 if (!putAvailable(headerBuffer, dst)) { 274 return false; 275 } 276 int count = maskAvailable(src, dst); 277 actualLen += Math.abs(count); 278 if (count >= 0 && actualLen != expectedLen) { 279 throw new IOException("Concurrent message modification"); 280 } 281 return count >= 0; 282 } 283 encodePong(ByteBuffer src, ByteBuffer dst)284 public boolean encodePong(ByteBuffer src, ByteBuffer dst) 285 throws IOException 286 { 287 if (debug.on()) { 288 debug.log("encode pong src=%s dst=%s", 289 src, dst); 290 } 291 if (closed) { 292 throw new IOException("Output closed"); 293 } 294 if (!started) { 295 expectedLen = src.remaining(); 296 if (expectedLen > Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH) { 297 throw new IllegalArgumentException("Long message: " + expectedLen); 298 } 299 setupHeader(Opcode.PONG, true, expectedLen); 300 started = true; 301 } 302 if (!putAvailable(headerBuffer, dst)) { 303 return false; 304 } 305 int count = maskAvailable(src, dst); 306 actualLen += Math.abs(count); 307 if (count >= 0 && actualLen != expectedLen) { 308 throw new IOException("Concurrent message modification"); 309 } 310 return count >= 0; 311 } 312 encodeClose(int statusCode, CharBuffer reason, ByteBuffer dst)313 public boolean encodeClose(int statusCode, CharBuffer reason, ByteBuffer dst) 314 throws IOException 315 { 316 if (debug.on()) { 317 debug.log("encode close statusCode=%s reason=[pos=%s lim=%s cap=%s] dst=%s", 318 statusCode, reason.position(), reason.limit(), reason.capacity(), dst); 319 } 320 if (closed) { 321 throw new IOException("Output closed"); 322 } 323 if (!started) { 324 if (debug.on()) { 325 debug.log("reason [pos=%s lim=%s cap=%s]", 326 reason.position(), reason.limit(), reason.capacity()); 327 } 328 intermediateBuffer.position(0).limit(Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH); 329 intermediateBuffer.putChar((char) statusCode); 330 CoderResult r = charsetEncoder.reset().encode(reason, intermediateBuffer, true); 331 if (r.isUnderflow()) { 332 if (debug.on()) { 333 debug.log("flushing"); 334 } 335 r = charsetEncoder.flush(intermediateBuffer); 336 } 337 if (debug.on()) { 338 debug.log("encoding result: %s", r); 339 } 340 if (r.isError()) { 341 try { 342 r.throwException(); 343 } catch (CharacterCodingException e) { 344 throw new IOException("Malformed reason", e); 345 } 346 } else if (r.isOverflow()) { 347 // Here the 125 bytes size is ensured by the check for overflow 348 throw new IOException("Long reason"); 349 } else if (!r.isUnderflow()) { 350 throw new InternalError(); // assertion 351 } 352 intermediateBuffer.flip(); 353 setupHeader(Opcode.CLOSE, true, intermediateBuffer.remaining()); 354 started = true; 355 closed = true; 356 if (debug.on()) { 357 debug.log("intermediateBuffer=%s", intermediateBuffer); 358 } 359 } 360 if (!putAvailable(headerBuffer, dst)) { 361 return false; 362 } 363 return maskAvailable(intermediateBuffer, dst) >= 0; 364 } 365 setupHeader(Opcode opcode, boolean fin, long payloadLen)366 private void setupHeader(Opcode opcode, boolean fin, long payloadLen) { 367 if (debug.on()) { 368 debug.log("frame opcode=%s fin=%s len=%s", 369 opcode, fin, payloadLen); 370 } 371 headerBuffer.clear(); 372 int mask = maskingKeySource.nextInt(); 373 headerWriter.fin(fin) 374 .opcode(opcode) 375 .payloadLen(payloadLen) 376 .mask(mask) 377 .write(headerBuffer); 378 headerBuffer.flip(); 379 payloadMasker.mask(mask); 380 } 381 } 382