1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4
5 #include <Ice/WSTransceiver.h>
6 #include <Ice/Endpoint.h>
7 #include <Ice/Connection.h>
8 #include <Ice/ProtocolInstance.h>
9 #include <Ice/HttpParser.h>
10 #include <Ice/Communicator.h>
11 #include <Ice/LoggerUtil.h>
12 #include <Ice/Buffer.h>
13 #include <Ice/LocalException.h>
14 #include <Ice/Base64.h>
15 #include <IceUtil/Random.h>
16 #include <Ice/SHA1.h>
17 #include <IceUtil/StringUtil.h>
18
19 // Python 2.7 under Windows.
20 #if _MSC_VER == 1500
21 typedef unsigned short uint16_t;
22 #else
23 #include <stdint.h>
24 #endif
25
26 #include <climits>
27
28 using namespace std;
29 using namespace Ice;
30 using namespace IceInternal;
31
32 //
33 // WebSocket opcodes
34 //
35 #define OP_CONT 0x0 // Continuation frame
36 #define OP_TEXT 0x1 // Text frame
37 #define OP_DATA 0x2 // Data frame
38 #define OP_RES_0x3 0x3 // Reserved
39 #define OP_RES_0x4 0x4 // Reserved
40 #define OP_RES_0x5 0x5 // Reserved
41 #define OP_RES_0x6 0x6 // Reserved
42 #define OP_RES_0x7 0x7 // Reserved
43 #define OP_CLOSE 0x8 // Connection close
44 #define OP_PING 0x9 // Ping
45 #define OP_PONG 0xA // Pong
46 #define OP_RES_0xB 0xB // Reserved
47 #define OP_RES_0xC 0xC // Reserved
48 #define OP_RES_0xD 0xD // Reserved
49 #define OP_RES_0xE 0xE // Reserved
50 #define OP_RES_0xF 0xF // Reserved
51 #define FLAG_FINAL 0x80 // Last frame
52 #define FLAG_MASKED 0x80 // Payload is masked
53
54 #define CLOSURE_NORMAL 1000
55 #define CLOSURE_SHUTDOWN 1001
56 #define CLOSURE_PROTOCOL_ERROR 1002
57 #define CLOSURE_TOO_BIG 1009
58
59 namespace
60 {
61
62 const string _iceProtocol = "ice.zeroc.com";
63 const string _wsUUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
64
65 //
66 // Rename to avoid conflict with OS 10.10 htonll
67 //
ice_htonll(Long v,Byte * dest)68 void ice_htonll(Long v, Byte* dest)
69 {
70 //
71 // Transfer a 64-bit integer in network (big-endian) order.
72 //
73 #ifdef ICE_BIG_ENDIAN
74 const Byte* src = reinterpret_cast<const Byte*>(&v);
75 *dest++ = *src++;
76 *dest++ = *src++;
77 *dest++ = *src++;
78 *dest++ = *src++;
79 *dest++ = *src++;
80 *dest++ = *src++;
81 *dest++ = *src++;
82 *dest = *src;
83 #else
84 const Byte* src = reinterpret_cast<const Byte*>(&v) + sizeof(Long) - 1;
85 *dest++ = *src--;
86 *dest++ = *src--;
87 *dest++ = *src--;
88 *dest++ = *src--;
89 *dest++ = *src--;
90 *dest++ = *src--;
91 *dest++ = *src--;
92 *dest = *src;
93 #endif
94 }
95
96 //
97 // Rename to avoid conflict with OS 10.10 nlltoh
98 //
ice_nlltoh(const Byte * src)99 Long ice_nlltoh(const Byte* src)
100 {
101 Long v;
102
103 //
104 // Extract a 64-bit integer in network (big-endian) order.
105 //
106 #ifdef ICE_BIG_ENDIAN
107 Byte* dest = reinterpret_cast<Byte*>(&v);
108 *dest++ = *src++;
109 *dest++ = *src++;
110 *dest++ = *src++;
111 *dest++ = *src++;
112 *dest++ = *src++;
113 *dest++ = *src++;
114 *dest++ = *src++;
115 *dest = *src;
116 #else
117 Byte* dest = reinterpret_cast<Byte*>(&v) + sizeof(Long) - 1;
118 *dest-- = *src++;
119 *dest-- = *src++;
120 *dest-- = *src++;
121 *dest-- = *src++;
122 *dest-- = *src++;
123 *dest-- = *src++;
124 *dest-- = *src++;
125 *dest = *src;
126 #endif
127
128 return v;
129 }
130
131 #if defined(ICE_OS_UWP)
htons(Short v)132 Short htons(Short v)
133 {
134 Short result;
135 Byte* dest = reinterpret_cast<Byte*>(&result);
136
137 //
138 // Transfer a short in network (big-endian) order.
139 //
140 #ifdef ICE_BIG_ENDIAN
141 const Byte* src = reinterpret_cast<const Byte*>(&v);
142 *dest++ = *src++;
143 *dest = *src;
144 #else
145 const Byte* src = reinterpret_cast<const Byte*>(&v) + sizeof(Short) - 1;
146 *dest++ = *src--;
147 *dest = *src;
148 #endif
149 return result;
150 }
151
ntohs(Short value)152 Short ntohs(Short value)
153 {
154 const Byte* src = reinterpret_cast<Byte*>(&value);
155 Short v;
156
157 //
158 // Extract a 64-bit integer in network (big-endian) order.
159 //
160 #ifdef ICE_BIG_ENDIAN
161 Byte* dest = reinterpret_cast<Byte*>(&v);
162 *dest++ = *src++;
163 *dest = *src;
164 #else
165 Byte* dest = reinterpret_cast<Byte*>(&v) + sizeof(Short) - 1;
166 *dest-- = *src++;
167 *dest = *src;
168 #endif
169
170 return v;
171 }
172 #endif
173
174 }
175
176 NativeInfoPtr
getNativeInfo()177 IceInternal::WSTransceiver::getNativeInfo()
178 {
179 return _delegate->getNativeInfo();
180 }
181
182 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
183 AsyncInfo*
getAsyncInfo(SocketOperation status)184 IceInternal::WSTransceiver::getAsyncInfo(SocketOperation status)
185 {
186 return _delegate->getNativeInfo()->getAsyncInfo(status);
187 }
188 #endif
189
190 SocketOperation
initialize(Buffer & readBuffer,Buffer & writeBuffer)191 IceInternal::WSTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer)
192 {
193 //
194 // Delegate logs exceptions that occur during initialize(), so there's no need to trap them here.
195 //
196 if(_state == StateInitializeDelegate)
197 {
198 SocketOperation op = _delegate->initialize(readBuffer, writeBuffer);
199 if(op != SocketOperationNone)
200 {
201 return op;
202 }
203 _state = StateConnected;
204 }
205
206 try
207 {
208 if(_state == StateConnected)
209 {
210 //
211 // We don't know how much we'll need to read.
212 //
213 _readBuffer.b.resize(_readBufferSize);
214 _readI = _readBuffer.i = _readBuffer.b.begin();
215
216 //
217 // The server waits for the client's upgrade request, the
218 // client sends the upgrade request.
219 //
220 _state = StateUpgradeRequestPending;
221 if(!_incoming)
222 {
223 //
224 // Compose the upgrade request.
225 //
226 ostringstream out;
227 out << "GET " << _resource << " HTTP/1.1\r\n"
228 << "Host: " << _host << "\r\n"
229 << "Upgrade: websocket\r\n"
230 << "Connection: Upgrade\r\n"
231 << "Sec-WebSocket-Protocol: " << _iceProtocol << "\r\n"
232 << "Sec-WebSocket-Version: 13\r\n"
233 << "Sec-WebSocket-Key: ";
234
235 //
236 // The value for Sec-WebSocket-Key is a 16-byte random number,
237 // encoded with Base64.
238 //
239 vector<unsigned char> key(16);
240 IceUtilInternal::generateRandom(reinterpret_cast<char*>(&key[0]), key.size());
241 _key = IceInternal::Base64::encode(key);
242 out << _key << "\r\n\r\n"; // EOM
243
244 string str = out.str();
245 _writeBuffer.b.resize(str.size());
246 memcpy(&_writeBuffer.b[0], str.c_str(), str.size());
247 _writeBuffer.i = _writeBuffer.b.begin();
248 }
249 }
250
251 //
252 // Try to write the client's upgrade request.
253 //
254 if(_state == StateUpgradeRequestPending && !_incoming)
255 {
256 if(_writeBuffer.i < _writeBuffer.b.end())
257 {
258 SocketOperation s = _delegate->write(_writeBuffer);
259 if(s)
260 {
261 return s;
262 }
263 }
264 assert(_writeBuffer.i == _writeBuffer.b.end());
265 _state = StateUpgradeResponsePending;
266 }
267
268 while(true)
269 {
270 if(_readBuffer.i < _readBuffer.b.end())
271 {
272 SocketOperation s = _delegate->read(_readBuffer);
273 if(s == SocketOperationWrite || _readBuffer.i == _readBuffer.b.begin())
274 {
275 return s;
276 }
277 }
278
279 //
280 // Try to read the client's upgrade request or the server's response.
281 //
282 if((_state == StateUpgradeRequestPending && _incoming) ||
283 (_state == StateUpgradeResponsePending && !_incoming))
284 {
285 //
286 // Check if we have enough data for a complete message.
287 //
288 const Ice::Byte* p = _parser->isCompleteMessage(&_readBuffer.b[0], _readBuffer.i);
289 if(!p)
290 {
291 if(_readBuffer.i < _readBuffer.b.end())
292 {
293 return SocketOperationRead;
294 }
295
296 //
297 // Enlarge the buffer and try to read more.
298 //
299 const size_t oldSize = static_cast<size_t>(_readBuffer.i - _readBuffer.b.begin());
300 if(oldSize + 1024 > _instance->messageSizeMax())
301 {
302 throw MemoryLimitException(__FILE__, __LINE__);
303 }
304 _readBuffer.b.resize(oldSize + 1024);
305 _readBuffer.i = _readBuffer.b.begin() + oldSize;
306 continue; // Try again to read the response/request
307 }
308
309 //
310 // Set _readI at the end of the response/request message.
311 //
312 _readI = _readBuffer.b.begin() + (p - &_readBuffer.b[0]);
313 }
314
315 //
316 // We're done, the client's upgrade request or server's response is read.
317 //
318 break;
319 }
320
321 try
322 {
323 //
324 // Parse the client's upgrade request.
325 //
326 if(_state == StateUpgradeRequestPending && _incoming)
327 {
328 if(_parser->parse(&_readBuffer.b[0], _readI))
329 {
330 handleRequest(_writeBuffer);
331 _state = StateUpgradeResponsePending;
332 }
333 else
334 {
335 throw ProtocolException(__FILE__, __LINE__, "incomplete request message");
336 }
337 }
338
339 if(_state == StateUpgradeResponsePending)
340 {
341 if(_incoming)
342 {
343 if(_writeBuffer.i < _writeBuffer.b.end())
344 {
345 SocketOperation s = _delegate->write(_writeBuffer);
346 if(s)
347 {
348 return s;
349 }
350 }
351 }
352 else
353 {
354 //
355 // Parse the server's response
356 //
357 if(_parser->parse(&_readBuffer.b[0], _readI))
358 {
359 handleResponse();
360 }
361 else
362 {
363 throw ProtocolException(__FILE__, __LINE__, "incomplete response message");
364 }
365 }
366 }
367 }
368 catch(const WebSocketException& ex)
369 {
370 throw ProtocolException(__FILE__, __LINE__, ex.reason);
371 }
372
373 _state = StateOpened;
374 _nextState = StateOpened;
375
376 if(_readI < _readBuffer.i)
377 {
378 _delegate->getNativeInfo()->ready(SocketOperationRead, true);
379 }
380 }
381 catch(const Ice::LocalException& ex)
382 {
383 if(_instance->traceLevel() >= 2)
384 {
385 Trace out(_instance->logger(), _instance->traceCategory());
386 out << protocol() << " connection HTTP upgrade request failed\n" << toString() << "\n" << ex;
387 }
388 throw;
389 }
390
391 if(_instance->traceLevel() >= 1)
392 {
393 Trace out(_instance->logger(), _instance->traceCategory());
394 if(_incoming)
395 {
396 out << "accepted " << protocol() << " connection HTTP upgrade request\n" << toString();
397 }
398 else
399 {
400 out << protocol() << " connection HTTP upgrade request accepted\n" << toString();
401 }
402 }
403
404 return SocketOperationNone;
405 }
406
407 SocketOperation
closing(bool initiator,const Ice::LocalException & reason)408 IceInternal::WSTransceiver::closing(bool initiator, const Ice::LocalException& reason)
409 {
410 if(_instance->traceLevel() >= 1)
411 {
412 Trace out(_instance->logger(), _instance->traceCategory());
413 out << "gracefully closing " << protocol() << " connection\n" << toString();
414 }
415
416 State s = _nextState == StateOpened ? _state : _nextState;
417
418 if(s == StateClosingRequestPending && _closingInitiator)
419 {
420 //
421 // If we initiated a close connection but also received a
422 // close connection, we assume we didn't initiated the
423 // connection and we send the close frame now. This is to
424 // ensure that if both peers close the connection at the same
425 // time we don't hang having both peer waiting for the close
426 // frame of the other.
427 //
428 assert(!initiator);
429 _closingInitiator = false;
430 return SocketOperationWrite;
431 }
432 else if(s >= StateClosingRequestPending)
433 {
434 return SocketOperationNone;
435 }
436
437 _closingInitiator = initiator;
438
439 if(dynamic_cast<const Ice::CloseConnectionException*>(&reason))
440 {
441 _closingReason = CLOSURE_NORMAL;
442 }
443 else if(dynamic_cast<const Ice::ObjectAdapterDeactivatedException*>(&reason) ||
444 dynamic_cast<const Ice::CommunicatorDestroyedException*>(&reason))
445 {
446 _closingReason = CLOSURE_SHUTDOWN;
447 }
448 else if(dynamic_cast<const Ice::ProtocolException*>(&reason))
449 {
450 _closingReason = CLOSURE_PROTOCOL_ERROR;
451 }
452 else if(dynamic_cast<const Ice::MemoryLimitException*>(&reason))
453 {
454 _closingReason = CLOSURE_TOO_BIG;
455 }
456
457 if(_state == StateOpened)
458 {
459 _state = StateClosingRequestPending;
460 return initiator ? SocketOperationRead : SocketOperationWrite;
461 }
462 else
463 {
464 _nextState = StateClosingRequestPending;
465 return SocketOperationNone;
466 }
467 }
468
469 void
close()470 IceInternal::WSTransceiver::close()
471 {
472 _delegate->close();
473 _state = StateClosed;
474
475 //
476 // Clear the buffers now instead of waiting for destruction.
477 //
478 if(!_writePending)
479 {
480 _writeBuffer.b.clear();
481 }
482 if(!_readPending)
483 {
484 _readBuffer.b.clear();
485 }
486 }
487
488 SocketOperation
write(Buffer & buf)489 IceInternal::WSTransceiver::write(Buffer& buf)
490 {
491 if(_writePending)
492 {
493 return SocketOperationWrite;
494 }
495
496 if(_state < StateOpened)
497 {
498 if(_state < StateConnected)
499 {
500 return _delegate->write(buf);
501 }
502 else
503 {
504 return _delegate->write(_writeBuffer);
505 }
506 }
507
508 do
509 {
510 if(preWrite(buf))
511 {
512 if(_writeBuffer.i < _writeBuffer.b.end())
513 {
514 SocketOperation s = _delegate->write(_writeBuffer);
515 if(s)
516 {
517 return s;
518 }
519 }
520 else if(_incoming && !buf.b.empty() && _writeState == WriteStatePayload)
521 {
522 SocketOperation s = _delegate->write(buf);
523 if(s)
524 {
525 return s;
526 }
527 }
528 }
529 }
530 while(postWrite(buf));
531
532 if(_state == StateClosingResponsePending && !_closingInitiator)
533 {
534 return SocketOperationRead;
535 }
536 return SocketOperationNone;
537 }
538
539 SocketOperation
read(Buffer & buf)540 IceInternal::WSTransceiver::read(Buffer& buf)
541 {
542 if(_readPending)
543 {
544 return SocketOperationRead;
545 }
546
547 if(_state < StateOpened)
548 {
549 if(_state < StateConnected)
550 {
551 return _delegate->read(buf);
552 }
553 else
554 {
555 if(_delegate->read(_readBuffer) == SocketOperationWrite)
556 {
557 return SocketOperationWrite;
558 }
559 else
560 {
561 return SocketOperationNone;
562 }
563 }
564 }
565
566 //
567 // If we read the full Ice message, handle it before trying
568 // reading anymore data from the WS connection.
569 //
570 if(buf.i == buf.b.end())
571 {
572 if(_readI < _readBuffer.i)
573 {
574 _delegate->getNativeInfo()->ready(SocketOperationRead, true);
575 }
576 return SocketOperationNone;
577 }
578
579 SocketOperation s = SocketOperationNone;
580 do
581 {
582 if(preRead(buf))
583 {
584 if(_readState == ReadStatePayload)
585 {
586 //
587 // If the payload length is smaller than what remains to be read, we read
588 // no more than the payload length. The remaining of the buffer will be
589 // sent over in another frame.
590 //
591 size_t readSz = _readPayloadLength - (buf.i - _readStart); // Already read
592 if(static_cast<size_t>(buf.b.end() - buf.i) > readSz)
593 {
594 size_t size = buf.b.size();
595 buf.b.resize(buf.i - buf.b.begin() + readSz);
596 s = _delegate->read(buf);
597 buf.b.resize(size);
598 }
599 else
600 {
601 s = _delegate->read(buf);
602 }
603 }
604 else
605 {
606 s = _delegate->read(_readBuffer);
607 }
608
609 if(s == SocketOperationWrite)
610 {
611 postRead(buf);
612 return s;
613 }
614 }
615 }
616 while(postRead(buf));
617
618 if(buf.i == buf.b.end())
619 {
620 if(_readI < _readBuffer.i)
621 {
622 _delegate->getNativeInfo()->ready(SocketOperationRead, true);
623 }
624 s = SocketOperationNone;
625 }
626 else
627 {
628 _delegate->getNativeInfo()->ready(SocketOperationRead, false);
629 s = SocketOperationRead;
630 }
631
632 if(((_state == StateClosingRequestPending && !_closingInitiator) ||
633 (_state == StateClosingResponsePending && _closingInitiator) ||
634 _state == StatePingPending ||
635 _state == StatePongPending) &&
636 _writeState == WriteStateHeader)
637 {
638 // We have things to write, ask to be notified when writes are ready.
639 s = static_cast<SocketOperation>(s | SocketOperationWrite);
640 }
641 return s;
642 }
643
644 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
645 bool
startWrite(Buffer & buf)646 IceInternal::WSTransceiver::startWrite(Buffer& buf)
647 {
648 _writePending = true;
649 if(_state < StateOpened)
650 {
651 if(_state < StateConnected)
652 {
653 return _delegate->startWrite(buf);
654 }
655 else
656 {
657 return _delegate->startWrite(_writeBuffer);
658 }
659 }
660
661 if(preWrite(buf))
662 {
663 if(_writeBuffer.i < _writeBuffer.b.end())
664 {
665 if(_delegate->startWrite(_writeBuffer))
666 {
667 return buf.b.size() == _writePayloadLength; // Return true only if we've written the whole buffer.
668 }
669 return false;
670 }
671 else
672 {
673 assert(_incoming);
674 return _delegate->startWrite(buf);
675 }
676 }
677 else
678 {
679 _delegate->getNativeInfo()->completed(IceInternal::SocketOperationWrite);
680 return false;
681 }
682 }
683
684 void
finishWrite(Buffer & buf)685 IceInternal::WSTransceiver::finishWrite(Buffer& buf)
686 {
687 _writePending = false;
688
689 if(_state < StateOpened)
690 {
691 if(_state < StateConnected)
692 {
693 _delegate->finishWrite(buf);
694 }
695 else
696 {
697 _delegate->finishWrite(_writeBuffer);
698 }
699 return;
700 }
701
702 if(_writeBuffer.i < _writeBuffer.b.end())
703 {
704 _delegate->finishWrite(_writeBuffer);
705 }
706 else if(!buf.b.empty() && buf.i != buf.b.end())
707 {
708 assert(_incoming);
709 _delegate->finishWrite(buf);
710 }
711
712 if(_state == StateClosed)
713 {
714 _writeBuffer.b.clear();
715 return;
716 }
717
718 postWrite(buf);
719 }
720
721 void
startRead(Buffer & buf)722 IceInternal::WSTransceiver::startRead(Buffer& buf)
723 {
724 _readPending = true;
725 if(_state < StateOpened)
726 {
727 if(_state < StateConnected)
728 {
729 _delegate->startRead(buf);
730 }
731 else
732 {
733 _delegate->startRead(_readBuffer);
734 }
735 return;
736 }
737
738 if(preRead(buf))
739 {
740 if(_readState == ReadStatePayload)
741 {
742 //
743 // If the payload length is smaller than what remains to be read, we read
744 // no more than the payload length. The remaining of the buffer will be
745 // sent over in another frame.
746 //
747 size_t readSz = _readPayloadLength - (buf.i - _readStart);
748 if(static_cast<size_t>(buf.b.end() - buf.i) > readSz)
749 {
750 size_t size = buf.b.size();
751 buf.b.resize(buf.i - buf.b.begin() + readSz);
752 _delegate->startRead(buf);
753 buf.b.resize(size);
754 }
755 else
756 {
757 _delegate->startRead(buf);
758 }
759 }
760 else
761 {
762 _delegate->startRead(_readBuffer);
763 }
764 }
765 else
766 {
767 _delegate->getNativeInfo()->completed(IceInternal::SocketOperationRead);
768 }
769 }
770
771 void
finishRead(Buffer & buf)772 IceInternal::WSTransceiver::finishRead(Buffer& buf)
773 {
774 _readPending = false;
775 if(_state < StateOpened)
776 {
777 if(_state < StateConnected)
778 {
779 _delegate->finishRead(buf);
780 }
781 else
782 {
783 _delegate->finishRead(_readBuffer);
784 }
785 return;
786 }
787
788 if(buf.b.empty() || buf.i == buf.b.end())
789 {
790 // Nothing to do.
791 }
792 else if(_readState == ReadStatePayload)
793 {
794 _delegate->finishRead(buf);
795 }
796 else
797 {
798 _delegate->finishRead(_readBuffer);
799 }
800
801 if(_state == StateClosed)
802 {
803 _readBuffer.b.clear();
804 return;
805 }
806
807 postRead(buf);
808 }
809 #endif
810
811 string
protocol() const812 IceInternal::WSTransceiver::protocol() const
813 {
814 return _instance->protocol();
815 }
816
817 string
toString() const818 IceInternal::WSTransceiver::toString() const
819 {
820 return _delegate->toString();
821 }
822
823 string
toDetailedString() const824 IceInternal::WSTransceiver::toDetailedString() const
825 {
826 return _delegate->toDetailedString();
827 }
828
829 Ice::ConnectionInfoPtr
getInfo() const830 IceInternal::WSTransceiver::getInfo() const
831 {
832 WSConnectionInfoPtr info = ICE_MAKE_SHARED(WSConnectionInfo);
833 info->underlying = _delegate->getInfo();
834 info->headers = _parser->getHeaders();
835 return info;
836 }
837
838 void
checkSendSize(const Buffer & buf)839 IceInternal::WSTransceiver::checkSendSize(const Buffer& buf)
840 {
841 _delegate->checkSendSize(buf);
842 }
843
844 void
setBufferSize(int rcvSize,int sndSize)845 IceInternal::WSTransceiver::setBufferSize(int rcvSize, int sndSize)
846 {
847 _delegate->setBufferSize(rcvSize, sndSize);
848 }
849
WSTransceiver(const ProtocolInstancePtr & instance,const TransceiverPtr & del,const string & host,const string & resource)850 IceInternal::WSTransceiver::WSTransceiver(const ProtocolInstancePtr& instance, const TransceiverPtr& del,
851 const string& host, const string& resource) :
852 _instance(instance),
853 _delegate(del),
854 _host(host),
855 _resource(resource),
856 _incoming(false),
857 _state(StateInitializeDelegate),
858 _parser(new HttpParser),
859 _readState(ReadStateOpcode),
860 _readBufferSize(1024),
861 _readLastFrame(true),
862 _readOpCode(0),
863 _readHeaderLength(0),
864 _readPayloadLength(0),
865 _writeState(WriteStateHeader),
866 _writeBufferSize(16 * 1024),
867 _readPending(false),
868 _writePending(false),
869 _closingInitiator(false),
870 _closingReason(CLOSURE_NORMAL)
871 {
872 //
873 // Use 1KB read and 16KB write buffer sizes. We use 16KB for the
874 // write buffer size because all the data needs to be copied to
875 // the write buffer for the purpose of masking. A 16KB buffer
876 // appears to be a good compromise to reduce the number of socket
877 // write calls and not consume too much memory.
878 //
879 }
880
WSTransceiver(const ProtocolInstancePtr & instance,const TransceiverPtr & del)881 IceInternal::WSTransceiver::WSTransceiver(const ProtocolInstancePtr& instance, const TransceiverPtr& del) :
882 _instance(instance),
883 _delegate(del),
884 _incoming(true),
885 _state(StateInitializeDelegate),
886 _parser(new HttpParser),
887 _readState(ReadStateOpcode),
888 _readBufferSize(1024),
889 _readLastFrame(true),
890 _readOpCode(0),
891 _readHeaderLength(0),
892 _readPayloadLength(0),
893 _writeState(WriteStateHeader),
894 _writeBufferSize(1024),
895 _readPending(false),
896 _writePending(false),
897 _closingInitiator(false),
898 _closingReason(CLOSURE_NORMAL)
899 {
900 //
901 // Use 1KB read and write buffer sizes.
902 //
903 }
904
~WSTransceiver()905 IceInternal::WSTransceiver::~WSTransceiver()
906 {
907 }
908
909 void
handleRequest(Buffer & responseBuffer)910 IceInternal::WSTransceiver::handleRequest(Buffer& responseBuffer)
911 {
912 string val;
913
914 //
915 // HTTP/1.1
916 //
917 if(_parser->versionMajor() != 1 || _parser->versionMinor() != 1)
918 {
919 throw WebSocketException("unsupported HTTP version");
920 }
921
922 //
923 // "An |Upgrade| header field containing the value 'websocket',
924 // treated as an ASCII case-insensitive value."
925 //
926 if(!_parser->getHeader("Upgrade", val, true))
927 {
928 throw WebSocketException("missing value for Upgrade field");
929 }
930 else if(val != "websocket")
931 {
932 throw WebSocketException("invalid value `" + val + "' for Upgrade field");
933 }
934
935 //
936 // "A |Connection| header field that includes the token 'Upgrade',
937 // treated as an ASCII case-insensitive value.
938 //
939 if(!_parser->getHeader("Connection", val, true))
940 {
941 throw WebSocketException("missing value for Connection field");
942 }
943 else if(val.find("upgrade") == string::npos)
944 {
945 throw WebSocketException("invalid value `" + val + "' for Connection field");
946 }
947
948 //
949 // "A |Sec-WebSocket-Version| header field, with a value of 13."
950 //
951 if(!_parser->getHeader("Sec-WebSocket-Version", val, false))
952 {
953 throw WebSocketException("missing value for WebSocket version");
954 }
955 else if(val != "13")
956 {
957 throw WebSocketException("unsupported WebSocket version `" + val + "'");
958 }
959
960 //
961 // "Optionally, a |Sec-WebSocket-Protocol| header field, with a list
962 // of values indicating which protocols the client would like to
963 // speak, ordered by preference."
964 //
965 bool addProtocol = false;
966 if(_parser->getHeader("Sec-WebSocket-Protocol", val, true))
967 {
968 vector<string> protocols;
969 if(!IceUtilInternal::splitString(val, ",", protocols))
970 {
971 throw WebSocketException("invalid value `" + val + "' for WebSocket protocol");
972 }
973 for(vector<string>::iterator p = protocols.begin(); p != protocols.end(); ++p)
974 {
975 if(IceUtilInternal::trim(*p) != _iceProtocol)
976 {
977 throw WebSocketException("unknown value `" + *p + "' for WebSocket protocol");
978 }
979 addProtocol = true;
980 }
981 }
982
983 //
984 // "A |Sec-WebSocket-Key| header field with a base64-encoded
985 // value that, when decoded, is 16 bytes in length."
986 //
987 string key;
988 if(!_parser->getHeader("Sec-WebSocket-Key", key, false))
989 {
990 throw WebSocketException("missing value for WebSocket key");
991 }
992
993 vector<unsigned char> decodedKey = Base64::decode(key);
994 if(decodedKey.size() != 16)
995 {
996 throw WebSocketException("invalid value `" + key + "' for WebSocket key");
997 }
998
999 //
1000 // Retain the target resource.
1001 //
1002 const_cast<string&>(_resource) = _parser->uri();
1003
1004 //
1005 // Compose the response.
1006 //
1007 ostringstream out;
1008 out << "HTTP/1.1 101 Switching Protocols\r\n"
1009 << "Upgrade: websocket\r\n"
1010 << "Connection: Upgrade\r\n";
1011 if(addProtocol)
1012 {
1013 out << "Sec-WebSocket-Protocol: " << _iceProtocol << "\r\n";
1014 }
1015
1016 //
1017 // The response includes:
1018 //
1019 // "A |Sec-WebSocket-Accept| header field. The value of this
1020 // header field is constructed by concatenating /key/, defined
1021 // above in step 4 in Section 4.2.2, with the string "258EAFA5-
1022 // E914-47DA-95CA-C5AB0DC85B11", taking the SHA-1 hash of this
1023 // concatenated value to obtain a 20-byte value and base64-
1024 // encoding (see Section 4 of [RFC4648]) this 20-byte hash.
1025 //
1026 out << "Sec-WebSocket-Accept: ";
1027 string input = key + _wsUUID;
1028 vector<unsigned char> hash;
1029 sha1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(), hash);
1030 out << IceInternal::Base64::encode(hash) << "\r\n" << "\r\n"; // EOM
1031
1032 string str = out.str();
1033 responseBuffer.b.resize(str.size());
1034 memcpy(&responseBuffer.b[0], str.c_str(), str.size());
1035 responseBuffer.i = responseBuffer.b.begin();
1036 }
1037
1038 void
handleResponse()1039 IceInternal::WSTransceiver::handleResponse()
1040 {
1041 string val;
1042
1043 //
1044 // HTTP/1.1
1045 //
1046 if(_parser->versionMajor() != 1 || _parser->versionMinor() != 1)
1047 {
1048 throw WebSocketException("unsupported HTTP version");
1049 }
1050
1051 //
1052 // "If the status code received from the server is not 101, the
1053 // client handles the response per HTTP [RFC2616] procedures. In
1054 // particular, the client might perform authentication if it
1055 // receives a 401 status code; the server might redirect the client
1056 // using a 3xx status code (but clients are not required to follow
1057 // them), etc."
1058 //
1059 if(_parser->status() != 101)
1060 {
1061 ostringstream out;
1062 out << "unexpected status value " << _parser->status();
1063 if(!_parser->reason().empty())
1064 {
1065 out << ":" << endl << _parser->reason();
1066 }
1067 throw WebSocketException(out.str());
1068 }
1069
1070 //
1071 // "If the response lacks an |Upgrade| header field or the |Upgrade|
1072 // header field contains a value that is not an ASCII case-
1073 // insensitive match for the value "websocket", the client MUST
1074 // _Fail the WebSocket Connection_."
1075 //
1076 if(!_parser->getHeader("Upgrade", val, true))
1077 {
1078 throw WebSocketException("missing value for Upgrade field");
1079 }
1080 else if(val != "websocket")
1081 {
1082 throw WebSocketException("invalid value `" + val + "' for Upgrade field");
1083 }
1084
1085 //
1086 // "If the response lacks a |Connection| header field or the
1087 // |Connection| header field doesn't contain a token that is an
1088 // ASCII case-insensitive match for the value "Upgrade", the client
1089 // MUST _Fail the WebSocket Connection_."
1090 //
1091 if(!_parser->getHeader("Connection", val, true))
1092 {
1093 throw WebSocketException("missing value for Connection field");
1094 }
1095 else if(val.find("upgrade") == string::npos)
1096 {
1097 throw WebSocketException("invalid value `" + val + "' for Connection field");
1098 }
1099
1100 //
1101 // "If the response includes a |Sec-WebSocket-Protocol| header field
1102 // and this header field indicates the use of a subprotocol that was
1103 // not present in the client's handshake (the server has indicated a
1104 // subprotocol not requested by the client), the client MUST _Fail
1105 // the WebSocket Connection_."
1106 //
1107 if(_parser->getHeader("Sec-WebSocket-Protocol", val, true) && val != _iceProtocol)
1108 {
1109 throw WebSocketException("invalid value `" + val + "' for WebSocket protocol");
1110 }
1111
1112 //
1113 // "If the response lacks a |Sec-WebSocket-Accept| header field or
1114 // the |Sec-WebSocket-Accept| contains a value other than the
1115 // base64-encoded SHA-1 of the concatenation of the |Sec-WebSocket-
1116 // Key| (as a string, not base64-decoded) with the string "258EAFA5-
1117 // E914-47DA-95CA-C5AB0DC85B11" but ignoring any leading and
1118 // trailing whitespace, the client MUST _Fail the WebSocket
1119 // Connection_."
1120 //
1121 if(!_parser->getHeader("Sec-WebSocket-Accept", val, false))
1122 {
1123 throw WebSocketException("missing value for Sec-WebSocket-Accept");
1124 }
1125 string input = _key + _wsUUID;
1126 vector<unsigned char> hash;
1127 sha1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(), hash);
1128 if(val != IceInternal::Base64::encode(hash))
1129 {
1130 throw WebSocketException("invalid value `" + val + "' for Sec-WebSocket-Accept");
1131 }
1132 }
1133
1134 bool
preRead(Buffer & buf)1135 IceInternal::WSTransceiver::preRead(Buffer& buf)
1136 {
1137 while(true)
1138 {
1139 if(_readState == ReadStateOpcode)
1140 {
1141 //
1142 // Is there enough data available to read the opcode?
1143 //
1144 if(!readBuffered(2))
1145 {
1146 return true;
1147 }
1148
1149 //
1150 // Most-significant bit indicates whether this is the
1151 // last frame. Least-significant four bits hold the
1152 // opcode.
1153 //
1154 unsigned char ch = static_cast<unsigned char>(*_readI++);
1155 _readOpCode = ch & 0xf;
1156
1157 //
1158 // Remember if last frame if we're going to read a data or
1159 // continuation frame, this is only for protocol
1160 // correctness checking purpose.
1161 //
1162 if(_readOpCode == OP_DATA)
1163 {
1164 if(!_readLastFrame)
1165 {
1166 throw ProtocolException(__FILE__, __LINE__, "invalid data frame, no FIN on previous frame");
1167 }
1168 _readLastFrame = (ch & FLAG_FINAL) == FLAG_FINAL;
1169 }
1170 else if(_readOpCode == OP_CONT)
1171 {
1172 if(_readLastFrame)
1173 {
1174 throw ProtocolException(__FILE__, __LINE__, "invalid continuation frame, previous frame FIN set");
1175 }
1176 _readLastFrame = (ch & FLAG_FINAL) == FLAG_FINAL;
1177 }
1178
1179 ch = static_cast<unsigned char>(*_readI++);
1180
1181 //
1182 // Check the MASK bit. Messages sent by a client must be masked;
1183 // messages sent by a server must not be masked.
1184 //
1185 const bool masked = (ch & FLAG_MASKED) == FLAG_MASKED;
1186 if(masked != _incoming)
1187 {
1188 throw ProtocolException(__FILE__, __LINE__, "invalid masking");
1189 }
1190
1191 //
1192 // Extract the payload length, which can have the following values:
1193 //
1194 // 0-125: The payload length
1195 // 126: The subsequent two bytes contain the payload length
1196 // 127: The subsequent eight bytes contain the payload length
1197 //
1198 _readPayloadLength = (ch & 0x7f);
1199 if(_readPayloadLength < 126)
1200 {
1201 _readHeaderLength = 0;
1202 }
1203 else if(_readPayloadLength == 126)
1204 {
1205 _readHeaderLength = 2; // Need to read a 16-bit payload length.
1206 }
1207 else
1208 {
1209 _readHeaderLength = 8; // Need to read a 64-bit payload length.
1210 }
1211 if(masked)
1212 {
1213 _readHeaderLength += 4; // Need to read a 32-bit mask.
1214 }
1215
1216 _readState = ReadStateHeader;
1217 }
1218
1219 if(_readState == ReadStateHeader)
1220 {
1221 //
1222 // Is there enough data available to read the header?
1223 //
1224 if(_readHeaderLength > 0 && !readBuffered(_readHeaderLength))
1225 {
1226 return true;
1227 }
1228
1229 if(_readPayloadLength == 126)
1230 {
1231 _readPayloadLength = static_cast<size_t>(ntohs(*reinterpret_cast<uint16_t*>(_readI)));
1232 _readI += 2;
1233 }
1234 else if(_readPayloadLength == 127)
1235 {
1236 assert(_readPayloadLength == 127);
1237 Long l = ice_nlltoh(_readI);
1238 _readI += 8;
1239 if(l < 0 || l > INT_MAX)
1240 {
1241 ostringstream ostr;
1242 ostr << "invalid WebSocket payload length: " << l;
1243 throw ProtocolException(__FILE__, __LINE__, ostr.str());
1244 }
1245 _readPayloadLength = static_cast<size_t>(l);
1246 }
1247
1248 //
1249 // Read the mask if this is an incoming connection.
1250 //
1251 if(_incoming)
1252 {
1253 assert(_readBuffer.i - _readI >= 4); // We must have needed to read the mask.
1254 memcpy(_readMask, _readI, 4); // Copy the mask.
1255 _readI += 4;
1256 }
1257
1258 switch(_readOpCode)
1259 {
1260 case OP_TEXT: // Text frame
1261 {
1262 throw ProtocolException(__FILE__, __LINE__, "text frames not supported");
1263 }
1264 case OP_DATA: // Data frame
1265 case OP_CONT: // Continuation frame
1266 {
1267 if(_instance->traceLevel() >= 2)
1268 {
1269 Trace out(_instance->logger(), _instance->traceCategory());
1270 out << "received " << protocol() << (_readOpCode == OP_DATA ? " data" : " continuation");
1271 out << " frame with payload length of " << _readPayloadLength;
1272 out << " bytes\n" << toString();
1273 }
1274
1275 if(_readPayloadLength <= 0)
1276 {
1277 throw ProtocolException(__FILE__, __LINE__, "payload length is 0");
1278 }
1279 _readState = ReadStatePayload;
1280 assert(buf.i != buf.b.end());
1281 _readFrameStart = buf.i;
1282 break;
1283 }
1284 case OP_CLOSE: // Connection close
1285 {
1286 if(_instance->traceLevel() >= 2)
1287 {
1288 Trace out(_instance->logger(), _instance->traceCategory());
1289 out << "received " << protocol() << " connection close frame\n" << toString();
1290 }
1291
1292 State s = _nextState == StateOpened ? _state : _nextState;
1293 if(s == StateClosingRequestPending)
1294 {
1295 //
1296 // If we receive a close frame while we were actually
1297 // waiting to send one, change the role and send a
1298 // close frame response.
1299 //
1300 if(!_closingInitiator)
1301 {
1302 _closingInitiator = true;
1303 }
1304 if(_state == StateClosingRequestPending)
1305 {
1306 _state = StateClosingResponsePending;
1307 }
1308 else
1309 {
1310 _nextState = StateClosingResponsePending;
1311 }
1312 return false; // No longer interested in reading
1313 }
1314 else
1315 {
1316 throw ConnectionLostException(__FILE__, __LINE__, 0);
1317 }
1318 }
1319 case OP_PING:
1320 {
1321 if(_instance->traceLevel() >= 2)
1322 {
1323 Trace out(_instance->logger(), _instance->traceCategory());
1324 out << "received " << protocol() << " connection ping frame\n" << toString();
1325 }
1326 _readState = ReadStateControlFrame;
1327 break;
1328 }
1329 case OP_PONG: // Pong
1330 {
1331 if(_instance->traceLevel() >= 2)
1332 {
1333 Trace out(_instance->logger(), _instance->traceCategory());
1334 out << "received " << protocol() << " connection pong frame\n" << toString();
1335 }
1336 _readState = ReadStateControlFrame;
1337 break;
1338 }
1339 default:
1340 {
1341 ostringstream ostr;
1342 ostr << "unsupported opcode: " << _readOpCode;
1343 throw ProtocolException(__FILE__, __LINE__, ostr.str());
1344 }
1345 }
1346 }
1347
1348 if(_readState == ReadStateControlFrame)
1349 {
1350 if(_readPayloadLength > 0 && !readBuffered(_readPayloadLength))
1351 {
1352 return true;
1353 }
1354
1355 if(_readPayloadLength > 0 && _readOpCode == OP_PING)
1356 {
1357 _pingPayload.clear();
1358 _pingPayload.resize(_readPayloadLength);
1359 memcpy(&_pingPayload[0], _readI, _pingPayload.size());
1360 }
1361
1362 _readI += _readPayloadLength;
1363 _readPayloadLength = 0;
1364
1365 if(_readOpCode == OP_PING)
1366 {
1367 if(_state == StateOpened)
1368 {
1369 _state = StatePongPending; // Send pong frame now
1370 }
1371 else if(_nextState < StatePongPending)
1372 {
1373 _nextState = StatePongPending; // Send pong frame next
1374 }
1375 }
1376
1377 //
1378 // We've read the payload of the PING/PONG frame, we're ready
1379 // to read a new frame.
1380 //
1381 _readState = ReadStateOpcode;
1382 }
1383
1384 if(_readState == ReadStatePayload)
1385 {
1386 //
1387 // This must be assigned before the check for the buffer. If the buffer is empty
1388 // or already read, postRead will return false.
1389 //
1390 _readStart = buf.i;
1391
1392 if(buf.b.empty() || buf.i == buf.b.end())
1393 {
1394 return false;
1395 }
1396
1397 size_t n = min(_readBuffer.i - _readI, buf.b.end() - buf.i);
1398
1399 if(n > _readPayloadLength)
1400 {
1401 n = _readPayloadLength;
1402 }
1403 if(n > 0)
1404 {
1405 memcpy(buf.i, _readI, n);
1406 buf.i += n;
1407 _readI += n;
1408 }
1409 //
1410 // Continue reading if we didn't read the full message, otherwise give back
1411 // the control to the connection
1412 //
1413 return buf.i < buf.b.end() && n < _readPayloadLength;
1414 }
1415 }
1416 }
1417
1418 bool
postRead(Buffer & buf)1419 IceInternal::WSTransceiver::postRead(Buffer& buf)
1420 {
1421 if(_readState != ReadStatePayload)
1422 {
1423 return _readStart < _readBuffer.i; // Returns true if data was read.
1424 }
1425
1426 if(_readStart == buf.i)
1427 {
1428 return false; // Nothing was read or nothing to read.
1429 }
1430 assert(_readStart < buf.i);
1431
1432 if(_incoming)
1433 {
1434 //
1435 // Unmask the data we just read.
1436 //
1437 IceInternal::Buffer::Container::iterator p = _readStart;
1438 for(size_t n = _readStart - _readFrameStart; p < buf.i; ++p, ++n)
1439 {
1440 *p ^= _readMask[n % 4];
1441 }
1442 }
1443
1444 _readPayloadLength -= buf.i - _readStart;
1445 _readStart = buf.i;
1446 if(_readPayloadLength == 0)
1447 {
1448 //
1449 // We've read the complete payload, we're ready to read a new frame.
1450 //
1451 _readState = ReadStateOpcode;
1452 }
1453 return buf.i != buf.b.end();
1454 }
1455
1456 bool
preWrite(Buffer & buf)1457 IceInternal::WSTransceiver::preWrite(Buffer& buf)
1458 {
1459 if(_writeState == WriteStateHeader)
1460 {
1461 if(_state == StateOpened)
1462 {
1463 if(buf.b.empty() || buf.i == buf.b.end())
1464 {
1465 return false;
1466 }
1467
1468 assert(buf.i == buf.b.begin());
1469 prepareWriteHeader(OP_DATA, buf.b.size());
1470
1471 _writeState = WriteStatePayload;
1472 }
1473 else if(_state == StatePingPending)
1474 {
1475 prepareWriteHeader(OP_PING, 0); // Don't send any payload
1476
1477 _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin());
1478 _writeState = WriteStateControlFrame;
1479 _writeBuffer.i = _writeBuffer.b.begin();
1480 }
1481 else if(_state == StatePongPending)
1482 {
1483 prepareWriteHeader(OP_PONG, _pingPayload.size());
1484 if(_pingPayload.size() > static_cast<size_t>(_writeBuffer.b.end() - _writeBuffer.i))
1485 {
1486 size_t pos = _writeBuffer.i - _writeBuffer.b.begin();
1487 _writeBuffer.b.resize(pos + _pingPayload.size());
1488 _writeBuffer.i = _writeBuffer.b.begin() + pos;
1489 }
1490 memcpy(_writeBuffer.i, &_pingPayload[0], _pingPayload.size());
1491 _writeBuffer.i += _pingPayload.size();
1492 _pingPayload.clear();
1493
1494 _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin());
1495 _writeState = WriteStateControlFrame;
1496 _writeBuffer.i = _writeBuffer.b.begin();
1497 }
1498 else if((_state == StateClosingRequestPending && !_closingInitiator) ||
1499 (_state == StateClosingResponsePending && _closingInitiator))
1500 {
1501 prepareWriteHeader(OP_CLOSE, 2);
1502
1503 // Write closing reason
1504 *reinterpret_cast<uint16_t*>(_writeBuffer.i) = htons(static_cast<uint16_t>(_closingReason));
1505 if(!_incoming)
1506 {
1507 *_writeBuffer.i++ ^= _writeMask[0];
1508 *_writeBuffer.i++ ^= _writeMask[1];
1509 }
1510 else
1511 {
1512 _writeBuffer.i += 2;
1513 }
1514
1515 _writeState = WriteStateControlFrame;
1516 _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin());
1517 _writeBuffer.i = _writeBuffer.b.begin();
1518 }
1519 else
1520 {
1521 assert(_state != StateClosed);
1522 return false; // Nothing to write in this state
1523 }
1524
1525 _writePayloadLength = 0;
1526 }
1527
1528 if(_writeState == WriteStatePayload)
1529 {
1530 //
1531 // For an outgoing connection, each message must be masked with a random
1532 // 32-bit value, so we copy the entire message into the internal buffer
1533 // for writing. For incoming connections, we just copy the start of the
1534 // message in the internal buffer after the header. If the message is
1535 // larger, the reminder is sent directly from the message buffer to avoid
1536 // copying.
1537 //
1538
1539 if(!_incoming && (_writePayloadLength == 0 || _writeBuffer.i == _writeBuffer.b.end()))
1540 {
1541 if(_writeBuffer.i == _writeBuffer.b.end())
1542 {
1543 _writeBuffer.i = _writeBuffer.b.begin();
1544 }
1545
1546 size_t n = buf.i - buf.b.begin();
1547 for(; n < buf.b.size() && _writeBuffer.i < _writeBuffer.b.end(); ++_writeBuffer.i, ++n)
1548 {
1549 *_writeBuffer.i = buf.b[n] ^ _writeMask[n % 4];
1550 }
1551 _writePayloadLength = n;
1552 if(_writeBuffer.i < _writeBuffer.b.end())
1553 {
1554 _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin());
1555 }
1556 _writeBuffer.i = _writeBuffer.b.begin();
1557 }
1558 else if(_writePayloadLength == 0)
1559 {
1560 size_t n = min(_writeBuffer.b.end() - _writeBuffer.i, buf.b.end() - buf.i);
1561 memcpy(_writeBuffer.i, buf.i, n);
1562 _writeBuffer.i += n;
1563 buf.i += n;
1564 _writePayloadLength = n;
1565 if(_writeBuffer.i < _writeBuffer.b.end())
1566 {
1567 _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin());
1568 }
1569 _writeBuffer.i = _writeBuffer.b.begin();
1570 }
1571 return true;
1572 }
1573 else
1574 {
1575 return _writeBuffer.i < _writeBuffer.b.end();
1576 }
1577 }
1578
1579 bool
postWrite(Buffer & buf)1580 IceInternal::WSTransceiver::postWrite(Buffer& buf)
1581 {
1582 if(_state > StateOpened && _writeState == WriteStateControlFrame)
1583 {
1584 if(_writeBuffer.i == _writeBuffer.b.end())
1585 {
1586 if(_state == StatePingPending)
1587 {
1588 if(_instance->traceLevel() >= 2)
1589 {
1590 Trace out(_instance->logger(), _instance->traceCategory());
1591 out << "sent " << protocol() << " connection ping frame\n" << toString();
1592 }
1593 }
1594 else if(_state == StatePongPending)
1595 {
1596 if(_instance->traceLevel() >= 2)
1597 {
1598 Trace out(_instance->logger(), _instance->traceCategory());
1599 out << "sent " << protocol() << " connection pong frame\n" << toString();
1600 }
1601 }
1602 else if((_state == StateClosingRequestPending && !_closingInitiator) ||
1603 (_state == StateClosingResponsePending && _closingInitiator))
1604 {
1605 if(_instance->traceLevel() >= 2)
1606 {
1607 Trace out(_instance->logger(), _instance->traceCategory());
1608 out << "sent " << protocol() << " connection close frame\n" << toString();
1609 }
1610
1611 if(_state == StateClosingRequestPending && !_closingInitiator)
1612 {
1613 _writeState = WriteStateHeader;
1614 _state = StateClosingResponsePending;
1615 return false;
1616 }
1617 else
1618 {
1619 throw ConnectionLostException(__FILE__, __LINE__, 0);
1620 }
1621 }
1622 else if(_state == StateClosed)
1623 {
1624 return false;
1625 }
1626
1627 _state = _nextState;
1628 _nextState = StateOpened;
1629 _writeState = WriteStateHeader;
1630 }
1631 else
1632 {
1633 return true;
1634 }
1635 }
1636
1637 if((!_incoming || buf.i == buf.b.begin()) && _writePayloadLength > 0)
1638 {
1639 if(_writeBuffer.i == _writeBuffer.b.end())
1640 {
1641 buf.i = buf.b.begin() + _writePayloadLength;
1642 }
1643 }
1644
1645 if(buf.b.empty() || buf.i == buf.b.end())
1646 {
1647 _writeState = WriteStateHeader;
1648 if(_state == StatePingPending ||
1649 _state == StatePongPending ||
1650 (_state == StateClosingRequestPending && !_closingInitiator) ||
1651 (_state == StateClosingResponsePending && _closingInitiator))
1652 {
1653 return true;
1654 }
1655 }
1656 else if(_state == StateOpened)
1657 {
1658 return true;
1659 }
1660 return false;
1661 }
1662
1663 bool
readBuffered(IceInternal::Buffer::Container::size_type sz)1664 IceInternal::WSTransceiver::readBuffered(IceInternal::Buffer::Container::size_type sz)
1665 {
1666 if(_readI == _readBuffer.i)
1667 {
1668 _readBuffer.b.resize(_readBufferSize);
1669 _readI = _readBuffer.i = _readBuffer.b.begin();
1670 }
1671 else
1672 {
1673 IceInternal::Buffer::Container::size_type available = _readBuffer.i - _readI;
1674 if(available < sz)
1675 {
1676 if(_readI != &_readBuffer.b[0])
1677 {
1678 memmove(&_readBuffer.b[0], _readI, available);
1679 }
1680 _readBuffer.b.resize(max(_readBufferSize, sz));
1681 _readI = _readBuffer.b.begin();
1682 _readBuffer.i = _readI + available;
1683 }
1684 }
1685
1686 _readStart = _readBuffer.i;
1687 if(_readI + sz > _readBuffer.i)
1688 {
1689 return false; // Not enough read.
1690 }
1691 assert(_readBuffer.i > _readI);
1692 return true;
1693 }
1694
1695 void
prepareWriteHeader(Byte opCode,IceInternal::Buffer::Container::size_type payloadLength)1696 IceInternal::WSTransceiver::prepareWriteHeader(Byte opCode, IceInternal::Buffer::Container::size_type payloadLength)
1697 {
1698 //
1699 // We need to prepare the frame header.
1700 //
1701 _writeBuffer.b.resize(_writeBufferSize);
1702 _writeBuffer.i = _writeBuffer.b.begin();
1703
1704 //
1705 // Set the opcode - this is the one and only data frame.
1706 //
1707 *_writeBuffer.i++ = static_cast<Byte>(opCode | FLAG_FINAL);
1708
1709 //
1710 // Set the payload length.
1711 //
1712 if(payloadLength <= 125)
1713 {
1714 *_writeBuffer.i++ = static_cast<Byte>(payloadLength);
1715 }
1716 else if(payloadLength > 125 && payloadLength <= USHRT_MAX)
1717 {
1718 //
1719 // Use an extra 16 bits to encode the payload length.
1720 //
1721 *_writeBuffer.i++ = static_cast<Byte>(126);
1722 *reinterpret_cast<uint16_t*>(_writeBuffer.i) = htons(static_cast<uint16_t>(payloadLength));
1723 _writeBuffer.i += 2;
1724 }
1725 else if(payloadLength > USHRT_MAX)
1726 {
1727 //
1728 // Use an extra 64 bits to encode the payload length.
1729 //
1730 *_writeBuffer.i++ = static_cast<Byte>(127);
1731 ice_htonll(payloadLength, _writeBuffer.i);
1732 _writeBuffer.i += 8;
1733 }
1734
1735 if(!_incoming)
1736 {
1737 //
1738 // Add a random 32-bit mask to every outgoing frame, copy the payload data,
1739 // and apply the mask.
1740 //
1741 _writeBuffer.b[1] |= FLAG_MASKED;
1742 IceUtilInternal::generateRandom(reinterpret_cast<char*>(_writeMask), sizeof(_writeMask));
1743 memcpy(_writeBuffer.i, _writeMask, sizeof(_writeMask));
1744 _writeBuffer.i += sizeof(_writeMask);
1745 }
1746 }
1747