1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 namespace Ice
6 {
7     using System;
8     using System.Collections.Generic;
9     using System.Diagnostics;
10     using System.Text;
11     using System.Threading;
12     using System.Threading.Tasks;
13     using System.Linq;
14 
15     using Instrumentation;
16     using IceInternal;
17 
18     public sealed class ConnectionI : IceInternal.EventHandler, ResponseHandler, CancellationHandler, Connection
19     {
20         public interface StartCallback
21         {
connectionStartCompleted(ConnectionI connection)22             void connectionStartCompleted(ConnectionI connection);
connectionStartFailed(ConnectionI connection, LocalException ex)23             void connectionStartFailed(ConnectionI connection, LocalException ex);
24         }
25 
26         private class TimeoutCallback : TimerTask
27         {
TimeoutCallback(ConnectionI connection)28             public TimeoutCallback(ConnectionI connection)
29             {
30                 _connection = connection;
31             }
32 
runTimerTask()33             public void runTimerTask()
34             {
35                 _connection.timedOut();
36             }
37 
38             private ConnectionI _connection;
39         }
40 
start(StartCallback callback)41         public void start(StartCallback callback)
42         {
43             try
44             {
45                 lock(this)
46                 {
47                     //
48                     // The connection might already be closed if the communicator was destroyed.
49                     //
50                     if(_state >= StateClosed)
51                     {
52                         Debug.Assert(_exception != null);
53                         throw _exception;
54                     }
55 
56                     if(!initialize(SocketOperation.None) || !validate(SocketOperation.None))
57                     {
58                         _startCallback = callback;
59                         return;
60                     }
61 
62                     //
63                     // We start out in holding state.
64                     //
65                     setState(StateHolding);
66                 }
67             }
68             catch(LocalException ex)
69             {
70                 exception(ex);
71                 callback.connectionStartFailed(this, _exception);
72                 return;
73             }
74 
75             callback.connectionStartCompleted(this);
76         }
77 
startAndWait()78         public void startAndWait()
79         {
80             try
81             {
82                 lock(this)
83                 {
84                     //
85                     // The connection might already be closed if the communicator was destroyed.
86                     //
87                     if(_state >= StateClosed)
88                     {
89                         Debug.Assert(_exception != null);
90                         throw _exception;
91                     }
92 
93                     if(!initialize(SocketOperation.None) || !validate(SocketOperation.None))
94                     {
95                         //
96                         // Wait for the connection to be validated.
97                         //
98                         while(_state <= StateNotValidated)
99                         {
100                             Monitor.Wait(this);
101                         }
102 
103                         if(_state >= StateClosing)
104                         {
105                             Debug.Assert(_exception != null);
106                             throw _exception;
107                         }
108                     }
109 
110                     //
111                     // We start out in holding state.
112                     //
113                     setState(StateHolding);
114                 }
115             }
116             catch(LocalException ex)
117             {
118                 exception(ex);
119                 waitUntilFinished();
120                 return;
121             }
122         }
123 
activate()124         public void activate()
125         {
126             lock(this)
127             {
128                 if(_state <= StateNotValidated)
129                 {
130                     return;
131                 }
132 
133                 if(_acmLastActivity > -1)
134                 {
135                     _acmLastActivity = Time.currentMonotonicTimeMillis();
136                 }
137                 setState(StateActive);
138             }
139         }
140 
hold()141         public void hold()
142         {
143             lock(this)
144             {
145                 if(_state <= StateNotValidated)
146                 {
147                     return;
148                 }
149 
150                 setState(StateHolding);
151             }
152         }
153 
154         // DestructionReason.
155         public const int ObjectAdapterDeactivated = 0;
156         public const int CommunicatorDestroyed = 1;
157 
destroy(int reason)158         public void destroy(int reason)
159         {
160             lock(this)
161             {
162                 switch(reason)
163                 {
164                     case ObjectAdapterDeactivated:
165                     {
166                         setState(StateClosing, new ObjectAdapterDeactivatedException());
167                         break;
168                     }
169 
170                     case CommunicatorDestroyed:
171                     {
172                         setState(StateClosing, new CommunicatorDestroyedException());
173                         break;
174                     }
175                 }
176             }
177         }
178 
close(ConnectionClose mode)179         public void close(ConnectionClose mode)
180         {
181             lock(this)
182             {
183                 if(mode == ConnectionClose.Forcefully)
184                 {
185                     setState(StateClosed, new ConnectionManuallyClosedException(false));
186                 }
187                 else if(mode == ConnectionClose.Gracefully)
188                 {
189                     setState(StateClosing, new ConnectionManuallyClosedException(true));
190                 }
191                 else
192                 {
193                     Debug.Assert(mode == ConnectionClose.GracefullyWithWait);
194 
195                     //
196                     // Wait until all outstanding requests have been completed.
197                     //
198                     while(_asyncRequests.Count != 0)
199                     {
200                         Monitor.Wait(this);
201                     }
202 
203                     setState(StateClosing, new ConnectionManuallyClosedException(true));
204                 }
205             }
206         }
207 
isActiveOrHolding()208         public bool isActiveOrHolding()
209         {
210             lock(this)
211             {
212                 return _state > StateNotValidated && _state < StateClosing;
213             }
214         }
215 
isFinished()216         public bool isFinished()
217         {
218             //
219             // We can use TryLock here, because as long as there are still
220             // threads operating in this connection object, connection
221             // destruction is considered as not yet finished.
222             //
223             if(!Monitor.TryEnter(this))
224             {
225                 return false;
226             }
227 
228             try
229             {
230                 if(_state != StateFinished || _dispatchCount != 0)
231                 {
232                     return false;
233                 }
234 
235                 Debug.Assert(_state == StateFinished);
236                 return true;
237             }
238             finally
239             {
240                 Monitor.Exit(this);
241             }
242         }
243 
throwException()244         public void throwException()
245         {
246             lock(this)
247             {
248                 if(_exception != null)
249                 {
250                     Debug.Assert(_state >= StateClosing);
251                     throw _exception;
252                 }
253             }
254         }
255 
waitUntilHolding()256         public void waitUntilHolding()
257         {
258             lock(this)
259             {
260                 while(_state < StateHolding || _dispatchCount > 0)
261                 {
262                     Monitor.Wait(this);
263                 }
264             }
265         }
266 
waitUntilFinished()267         public void waitUntilFinished()
268         {
269             lock(this)
270             {
271                 //
272                 // We wait indefinitely until the connection is finished and all
273                 // outstanding requests are completed. Otherwise we couldn't
274                 // guarantee that there are no outstanding calls when deactivate()
275                 // is called on the servant locators.
276                 //
277                 while(_state < StateFinished || _dispatchCount > 0)
278                 {
279                     Monitor.Wait(this);
280                 }
281 
282                 Debug.Assert(_state == StateFinished);
283 
284                 //
285                 // Clear the OA. See bug 1673 for the details of why this is necessary.
286                 //
287                 _adapter = null;
288             }
289         }
290 
updateObserver()291         public void updateObserver()
292         {
293             lock(this)
294             {
295                 if(_state < StateNotValidated || _state > StateClosed)
296                 {
297                     return;
298                 }
299 
300                 Debug.Assert(_instance.initializationData().observer != null);
301                 _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
302                                                                                           _endpoint,
303                                                                                           toConnectionState(_state),
304                                                                                           _observer);
305                 if(_observer != null)
306                 {
307                     _observer.attach();
308                 }
309                 else
310                 {
311                     _writeStreamPos = -1;
312                     _readStreamPos = -1;
313                 }
314             }
315         }
316 
monitor(long now, ACMConfig acm)317         public void monitor(long now, ACMConfig acm)
318         {
319             lock(this)
320             {
321                 if(_state != StateActive)
322                 {
323                     return;
324                 }
325 
326                 //
327                 // We send a heartbeat if there was no activity in the last
328                 // (timeout / 4) period. Sending a heartbeat sooner than
329                 // really needed is safer to ensure that the receiver will
330                 // receive the heartbeat in time. Sending the heartbeat if
331                 // there was no activity in the last (timeout / 2) period
332                 // isn't enough since monitor() is called only every (timeout
333                 // / 2) period.
334                 //
335                 // Note that this doesn't imply that we are sending 4 heartbeats
336                 // per timeout period because the monitor() method is still only
337                 // called every (timeout / 2) period.
338                 //
339                 if(acm.heartbeat == ACMHeartbeat.HeartbeatAlways ||
340                    (acm.heartbeat != ACMHeartbeat.HeartbeatOff && _writeStream.isEmpty() &&
341                     now >= (_acmLastActivity + acm.timeout / 4)))
342                 {
343                     if(acm.heartbeat != ACMHeartbeat.HeartbeatOnDispatch || _dispatchCount > 0)
344                     {
345                         sendHeartbeatNow();
346                     }
347                 }
348 
349                 if(_readStream.size() > Protocol.headerSize || !_writeStream.isEmpty())
350                 {
351                     //
352                     // If writing or reading, nothing to do, the connection
353                     // timeout will kick-in if writes or reads don't progress.
354                     // This check is necessary because the actitivy timer is
355                     // only set when a message is fully read/written.
356                     //
357                     return;
358                 }
359 
360                 if(acm.close != ACMClose.CloseOff && now >= (_acmLastActivity + acm.timeout))
361                 {
362                     if(acm.close == ACMClose.CloseOnIdleForceful ||
363                        (acm.close != ACMClose.CloseOnIdle && (_asyncRequests.Count > 0)))
364                     {
365                         //
366                         // Close the connection if we didn't receive a heartbeat in
367                         // the last period.
368                         //
369                         setState(StateClosed, new ConnectionTimeoutException());
370                     }
371                     else if(acm.close != ACMClose.CloseOnInvocation &&
372                             _dispatchCount == 0 && _batchRequestQueue.isEmpty()  &&
373                             _asyncRequests.Count == 0)
374                     {
375                         //
376                         // The connection is idle, close it.
377                         //
378                         setState(StateClosing, new ConnectionTimeoutException());
379                     }
380                 }
381             }
382         }
383 
sendAsyncRequest(OutgoingAsyncBase og, bool compress, bool response, int batchRequestNum)384         public int sendAsyncRequest(OutgoingAsyncBase og, bool compress, bool response,
385                                     int batchRequestNum)
386         {
387             OutputStream os = og.getOs();
388 
389             lock(this)
390             {
391                 //
392                 // If the exception is closed before we even have a chance
393                 // to send our request, we always try to send the request
394                 // again.
395                 //
396                 if(_exception != null)
397                 {
398                     throw new RetryException(_exception);
399                 }
400 
401                 Debug.Assert(_state > StateNotValidated);
402                 Debug.Assert(_state < StateClosing);
403 
404                 //
405                 // Ensure the message isn't bigger than what we can send with the
406                 // transport.
407                 //
408                 _transceiver.checkSendSize(os.getBuffer());
409 
410                 //
411                 // Notify the request that it's cancelable with this connection.
412                 // This will throw if the request is canceled.
413                 //
414                 og.cancelable(this);
415                 int requestId = 0;
416                 if(response)
417                 {
418                     //
419                     // Create a new unique request ID.
420                     //
421                     requestId = _nextRequestId++;
422                     if(requestId <= 0)
423                     {
424                         _nextRequestId = 1;
425                         requestId = _nextRequestId++;
426                     }
427 
428                     //
429                     // Fill in the request ID.
430                     //
431                     os.pos(Protocol.headerSize);
432                     os.writeInt(requestId);
433                 }
434                 else if(batchRequestNum > 0)
435                 {
436                     os.pos(Protocol.headerSize);
437                     os.writeInt(batchRequestNum);
438                 }
439 
440                 og.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
441 
442                 int status = OutgoingAsyncBase.AsyncStatusQueued;
443                 try
444                 {
445                     OutgoingMessage message = new OutgoingMessage(og, os, compress, requestId);
446                     status = sendMessage(message);
447                 }
448                 catch(LocalException ex)
449                 {
450                     setState(StateClosed, ex);
451                     Debug.Assert(_exception != null);
452                     throw _exception;
453                 }
454 
455                 if(response)
456                 {
457                     //
458                     // Add to the async requests map.
459                     //
460                     _asyncRequests[requestId] = og;
461                 }
462                 return status;
463             }
464         }
465 
getBatchRequestQueue()466         public BatchRequestQueue getBatchRequestQueue()
467         {
468             return _batchRequestQueue;
469         }
470 
flushBatchRequests(CompressBatch compressBatch)471         public void flushBatchRequests(CompressBatch compressBatch)
472         {
473             try
474             {
475                 var completed = new FlushBatchTaskCompletionCallback();
476                 var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
477                 outgoing.invoke(_flushBatchRequests_name, compressBatch, true);
478                 completed.Task.Wait();
479             }
480             catch(AggregateException ex)
481             {
482                 throw ex.InnerException;
483             }
484         }
485 
486         private class ConnectionFlushBatchCompletionCallback : AsyncResultCompletionCallback
487         {
ConnectionFlushBatchCompletionCallback(Connection connection, Communicator communicator, Instance instance, string op, object cookie, AsyncCallback callback)488             public ConnectionFlushBatchCompletionCallback(Connection connection,
489                                                           Communicator communicator,
490                                                           Instance instance,
491                                                           string op,
492                                                           object cookie,
493                                                           AsyncCallback callback)
494                 : base(communicator, instance, op, cookie, callback)
495             {
496                 _connection = connection;
497             }
498 
getConnection()499             public override Connection getConnection()
500             {
501                 return _connection;
502             }
503 
getCompletedCallback()504             protected override AsyncCallback getCompletedCallback()
505             {
506                 return (AsyncResult result) =>
507                 {
508                     try
509                     {
510                         result.throwLocalException();
511                     }
512                     catch(Exception ex)
513                     {
514                         if(exceptionCallback_ != null)
515                         {
516                             exceptionCallback_.Invoke(ex);
517                         }
518                     }
519                 };
520             }
521 
522             private Connection _connection;
523         }
524 
flushBatchRequestsAsync(CompressBatch compressBatch, IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken())525         public Task flushBatchRequestsAsync(CompressBatch compressBatch,
526                                             IProgress<bool> progress = null,
527                                             CancellationToken cancel = new CancellationToken())
528         {
529             var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
530             var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
531             outgoing.invoke(_flushBatchRequests_name, compressBatch, false);
532             return completed.Task;
533         }
534 
begin_flushBatchRequests(CompressBatch compressBatch, AsyncCallback cb = null, object cookie = null)535         public AsyncResult begin_flushBatchRequests(CompressBatch compressBatch,
536                                                     AsyncCallback cb = null,
537                                                     object cookie = null)
538         {
539             var result = new ConnectionFlushBatchCompletionCallback(this, _communicator, _instance,
540                                                                     _flushBatchRequests_name, cookie, cb);
541             var outgoing = new ConnectionFlushBatchAsync(this, _instance, result);
542             outgoing.invoke(_flushBatchRequests_name, compressBatch, false);
543             return result;
544         }
545 
end_flushBatchRequests(AsyncResult r)546         public void end_flushBatchRequests(AsyncResult r)
547         {
548             if(r != null && r.getConnection() != this)
549             {
550                 const string msg = "Connection for call to end_" + _flushBatchRequests_name +
551                                    " does not match connection that was used to call corresponding begin_" +
552                                    _flushBatchRequests_name + " method";
553                 throw new ArgumentException(msg);
554             }
555             AsyncResultI.check(r, _flushBatchRequests_name).wait();
556         }
557 
558         private const string _flushBatchRequests_name = "flushBatchRequests";
559 
setCloseCallback(CloseCallback callback)560         public void setCloseCallback(CloseCallback callback)
561         {
562             lock(this)
563             {
564                 if(_state >= StateClosed)
565                 {
566                     if(callback != null)
567                     {
568                         _threadPool.dispatch(() =>
569                         {
570                             try
571                             {
572                                 callback(this);
573                             }
574                             catch(System.Exception ex)
575                             {
576                                 _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
577                             }
578                         } , this);
579                     }
580                 }
581                 else
582                 {
583                     _closeCallback = callback;
584                 }
585             }
586         }
587 
setHeartbeatCallback(HeartbeatCallback callback)588         public void setHeartbeatCallback(HeartbeatCallback callback)
589         {
590             lock(this)
591             {
592                 if(_state >= StateClosed)
593                 {
594                     return;
595                 }
596                 _heartbeatCallback = callback;
597             }
598         }
599 
heartbeat()600         public void heartbeat()
601         {
602             heartbeatAsync().Wait();
603         }
604 
605         private class HeartbeatCompletionCallback : AsyncResultCompletionCallback
606         {
HeartbeatCompletionCallback(Ice.Connection connection, Ice.Communicator communicator, Instance instance, object cookie, Ice.AsyncCallback callback)607             public HeartbeatCompletionCallback(Ice.Connection connection,
608                                                Ice.Communicator communicator,
609                                                Instance instance,
610                                                object cookie,
611                                                Ice.AsyncCallback callback)
612                 : base(communicator, instance, "heartbeat", cookie, callback)
613             {
614                 _connection = connection;
615             }
616 
getConnection()617             public override Ice.Connection getConnection()
618             {
619                 return _connection;
620             }
621 
getCompletedCallback()622             protected override Ice.AsyncCallback getCompletedCallback()
623             {
624                 return (Ice.AsyncResult result) =>
625                 {
626                     try
627                     {
628                         result.throwLocalException();
629                     }
630                     catch(Ice.Exception ex)
631                     {
632                         if(exceptionCallback_ != null)
633                         {
634                             exceptionCallback_.Invoke(ex);
635                         }
636                     }
637                 };
638             }
639 
640             private Ice.Connection _connection;
641         }
642 
643         private class HeartbeatTaskCompletionCallback : TaskCompletionCallback<object>
644         {
HeartbeatTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken)645             public HeartbeatTaskCompletionCallback(System.IProgress<bool> progress,
646                                                    CancellationToken cancellationToken) :
647                 base(progress, cancellationToken)
648             {
649             }
650 
handleInvokeResponse(bool ok, OutgoingAsyncBase og)651             public override void handleInvokeResponse(bool ok, OutgoingAsyncBase og)
652             {
653                 SetResult(null);
654             }
655         }
656 
657         private class HeartbeatAsync : OutgoingAsyncBase
658         {
HeartbeatAsync(Ice.ConnectionI connection, Instance instance, OutgoingAsyncCompletionCallback completionCallback)659             public HeartbeatAsync(Ice.ConnectionI connection,
660                                   Instance instance,
661                                   OutgoingAsyncCompletionCallback completionCallback) :
662                 base(instance, completionCallback)
663             {
664                 _connection = connection;
665             }
666 
invoke()667             public void invoke()
668             {
669                 try
670                 {
671                     os_.writeBlob(IceInternal.Protocol.magic);
672                     ProtocolVersion.ice_write(os_, Ice.Util.currentProtocol);
673                     EncodingVersion.ice_write(os_, Ice.Util.currentProtocolEncoding);
674                     os_.writeByte(IceInternal.Protocol.validateConnectionMsg);
675                     os_.writeByte((byte)0);
676                     os_.writeInt(IceInternal.Protocol.headerSize); // Message size.
677 
678                     int status = _connection.sendAsyncRequest(this, false, false, 0);
679 
680                     if((status & AsyncStatusSent) != 0)
681                     {
682                         sentSynchronously_ = true;
683                         if((status & AsyncStatusInvokeSentCallback) != 0)
684                         {
685                             invokeSent();
686                         }
687                     }
688                 }
689                 catch(RetryException ex)
690                 {
691                     try
692                     {
693                         throw ex.get();
694                     }
695                     catch(Ice.LocalException ee)
696                     {
697                         if(exception(ee))
698                         {
699                             invokeExceptionAsync();
700                         }
701                     }
702                 }
703                 catch(Ice.Exception ex)
704                 {
705                     if(exception(ex))
706                     {
707                         invokeExceptionAsync();
708                     }
709                 }
710             }
711 
712             private readonly Ice.ConnectionI _connection;
713         }
714 
heartbeatAsync(IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken())715         public Task heartbeatAsync(IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken())
716         {
717             var completed = new HeartbeatTaskCompletionCallback(progress, cancel);
718             var outgoing = new HeartbeatAsync(this, _instance, completed);
719             outgoing.invoke();
720             return completed.Task;
721         }
722 
begin_heartbeat(AsyncCallback cb = null, object cookie = null)723         public AsyncResult begin_heartbeat(AsyncCallback cb = null, object cookie = null)
724         {
725             var result = new HeartbeatCompletionCallback(this, _communicator, _instance, cookie, cb);
726             var outgoing = new HeartbeatAsync(this, _instance, result);
727             outgoing.invoke();
728             return result;
729         }
730 
end_heartbeat(AsyncResult r)731         public void end_heartbeat(AsyncResult r)
732         {
733             if(r != null && r.getConnection() != this)
734             {
735                 const string msg = "Connection for call to end_heartbeat does not match connection that was used " +
736                     "to call corresponding begin_heartbeat method";
737                 throw new ArgumentException(msg);
738             }
739             AsyncResultI.check(r, "heartbeat").wait();
740         }
741 
setACM(Optional<int> timeout, Optional<ACMClose> close, Optional<ACMHeartbeat> heartbeat)742         public void setACM(Optional<int> timeout, Optional<ACMClose> close, Optional<ACMHeartbeat> heartbeat)
743         {
744             lock(this)
745             {
746                 if(timeout.HasValue && timeout.Value < 0)
747                 {
748                     throw new ArgumentException("invalid negative ACM timeout value");
749                 }
750                 if(_monitor == null || _state >= StateClosed)
751                 {
752                     return;
753                 }
754 
755                 if(_state == StateActive)
756                 {
757                     _monitor.remove(this);
758                 }
759                 _monitor = _monitor.acm(timeout, close, heartbeat);
760 
761                 if(_monitor.getACM().timeout <= 0)
762                 {
763                     _acmLastActivity = -1; // Disable the recording of last activity.
764                 }
765                 else if(_state == StateActive && _acmLastActivity == -1)
766                 {
767                     _acmLastActivity = Time.currentMonotonicTimeMillis();
768                 }
769 
770                 if(_state == StateActive)
771                 {
772                     _monitor.add(this);
773                 }
774             }
775         }
776 
getACM()777         public ACM getACM()
778         {
779             lock(this)
780             {
781                 return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
782             }
783         }
784 
asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex)785         public void asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex)
786         {
787             //
788             // NOTE: This isn't called from a thread pool thread.
789             //
790 
791             lock(this)
792             {
793                 if(_state >= StateClosed)
794                 {
795                     return; // The request has already been or will be shortly notified of the failure.
796                 }
797 
798                 OutgoingMessage o = _sendStreams.FirstOrDefault(m => m.outAsync == outAsync);
799                 if(o != null)
800                 {
801                     if(o.requestId > 0)
802                     {
803                         _asyncRequests.Remove(o.requestId);
804                     }
805 
806                     if(ex is ConnectionTimeoutException)
807                     {
808                         setState(StateClosed, ex);
809                     }
810                     else
811                     {
812                         //
813                         // If the request is being sent, don't remove it from the send streams,
814                         // it will be removed once the sending is finished.
815                         //
816                         if(o == _sendStreams.First.Value)
817                         {
818                             o.canceled();
819                         }
820                         else
821                         {
822                             o.canceled();
823                             _sendStreams.Remove(o);
824                         }
825                         if(outAsync.exception(ex))
826                         {
827                             outAsync.invokeExceptionAsync();
828                         }
829                     }
830                     return;
831                 }
832 
833                 if(outAsync is OutgoingAsync)
834                 {
835                     foreach(KeyValuePair<int, OutgoingAsyncBase> kvp in _asyncRequests)
836                     {
837                         if(kvp.Value == outAsync)
838                         {
839                             if(ex is ConnectionTimeoutException)
840                             {
841                                 setState(StateClosed, ex);
842                             }
843                             else
844                             {
845                                 _asyncRequests.Remove(kvp.Key);
846                                 if(outAsync.exception(ex))
847                                 {
848                                     outAsync.invokeExceptionAsync();
849                                 }
850                             }
851                             return;
852                         }
853                     }
854                 }
855             }
856         }
857 
sendResponse(int requestId, OutputStream os, byte compressFlag, bool amd)858         public void sendResponse(int requestId, OutputStream os, byte compressFlag, bool amd)
859         {
860             lock(this)
861             {
862                 Debug.Assert(_state > StateNotValidated);
863 
864                 try
865                 {
866                     if(--_dispatchCount == 0)
867                     {
868                         if(_state == StateFinished)
869                         {
870                             reap();
871                         }
872                         Monitor.PulseAll(this);
873                     }
874 
875                     if(_state >= StateClosed)
876                     {
877                         Debug.Assert(_exception != null);
878                         throw _exception;
879                     }
880 
881                     sendMessage(new OutgoingMessage(os, compressFlag > 0, true));
882 
883                     if(_state == StateClosing && _dispatchCount == 0)
884                     {
885                         initiateShutdown();
886                     }
887                 }
888                 catch(LocalException ex)
889                 {
890                     setState(StateClosed, ex);
891                 }
892             }
893         }
894 
sendNoResponse()895         public void sendNoResponse()
896         {
897             lock(this)
898             {
899                 Debug.Assert(_state > StateNotValidated);
900 
901                 try
902                 {
903                     if(--_dispatchCount == 0)
904                     {
905                         if(_state == StateFinished)
906                         {
907                             reap();
908                         }
909                         Monitor.PulseAll(this);
910                     }
911 
912                     if(_state >= StateClosed)
913                     {
914                         Debug.Assert(_exception != null);
915                         throw _exception;
916                     }
917 
918                     if(_state == StateClosing && _dispatchCount == 0)
919                     {
920                         initiateShutdown();
921                     }
922                 }
923                 catch(LocalException ex)
924                 {
925                     setState(StateClosed, ex);
926                 }
927             }
928         }
929 
systemException(int requestId, SystemException ex, bool amd)930         public bool systemException(int requestId, SystemException ex, bool amd)
931         {
932             return false; // System exceptions aren't marshalled.
933         }
934 
invokeException(int requestId, LocalException ex, int invokeNum, bool amd)935         public void invokeException(int requestId, LocalException ex, int invokeNum, bool amd)
936         {
937             //
938             // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
939             // called in case of a fatal exception we decrement _dispatchCount here.
940             //
941 
942             lock(this)
943             {
944                 setState(StateClosed, ex);
945 
946                 if(invokeNum > 0)
947                 {
948                     Debug.Assert(_dispatchCount >= invokeNum);
949                     _dispatchCount -= invokeNum;
950                     if(_dispatchCount == 0)
951                     {
952                         if(_state == StateFinished)
953                         {
954                             reap();
955                         }
956                         Monitor.PulseAll(this);
957                     }
958                 }
959             }
960         }
961 
endpoint()962         public EndpointI endpoint()
963         {
964             return _endpoint; // No mutex protection necessary, _endpoint is immutable.
965         }
966 
connector()967         public Connector connector()
968         {
969             return _connector; // No mutex protection necessary, _endpoint is immutable.
970         }
971 
setAdapter(ObjectAdapter adapter)972         public void setAdapter(ObjectAdapter adapter)
973         {
974             if(adapter != null)
975             {
976                 // Go through the adapter to set the adapter and servant manager on this connection
977                 // to ensure the object adapter is still active.
978                 ((ObjectAdapterI)adapter).setAdapterOnConnection(this);
979             }
980             else
981             {
982                 lock(this)
983                 {
984                     if(_state <= StateNotValidated || _state >= StateClosing)
985                     {
986                         return;
987                     }
988                     _adapter = null;
989                     _servantManager = null;
990                 }
991             }
992 
993             //
994             // We never change the thread pool with which we were initially
995             // registered, even if we add or remove an object adapter.
996             //
997         }
998 
getAdapter()999         public ObjectAdapter getAdapter()
1000         {
1001             lock(this)
1002             {
1003                 return _adapter;
1004             }
1005         }
1006 
getEndpoint()1007         public Endpoint getEndpoint()
1008         {
1009             return _endpoint; // No mutex protection necessary, _endpoint is immutable.
1010         }
1011 
createProxy(Identity ident)1012         public ObjectPrx createProxy(Identity ident)
1013         {
1014             //
1015             // Create a reference and return a reverse proxy for this
1016             // reference.
1017             //
1018             return _instance.proxyFactory().referenceToProxy(_instance.referenceFactory().create(ident, this));
1019         }
1020 
setAdapterAndServantManager(ObjectAdapter adapter, IceInternal.ServantManager servantManager)1021         public void setAdapterAndServantManager(ObjectAdapter adapter, IceInternal.ServantManager servantManager)
1022         {
1023             lock(this)
1024             {
1025                 if(_state <= StateNotValidated || _state >= StateClosing)
1026                 {
1027                     return;
1028                 }
1029                 Debug.Assert(adapter != null); // Called by ObjectAdapterI::setAdapterOnConnection
1030                 _adapter = adapter;
1031                 _servantManager = servantManager;
1032             }
1033         }
1034 
1035         //
1036         // Operations from EventHandler
1037         //
startAsync(int operation, IceInternal.AsyncCallback cb, ref bool completedSynchronously)1038         public override bool startAsync(int operation, IceInternal.AsyncCallback cb, ref bool completedSynchronously)
1039         {
1040             if(_state >= StateClosed)
1041             {
1042                 return false;
1043             }
1044 
1045             try
1046             {
1047                 if((operation & SocketOperation.Write) != 0)
1048                 {
1049                     if(_observer != null)
1050                     {
1051                         observerStartWrite(_writeStream.getBuffer());
1052                     }
1053 
1054                     bool completed;
1055                     completedSynchronously = _transceiver.startWrite(_writeStream.getBuffer(), cb, this, out completed);
1056                     if(completed && _sendStreams.Count > 0)
1057                     {
1058                         // The whole message is written, assume it's sent now for at-most-once semantics.
1059                         _sendStreams.First.Value.isSent = true;
1060                     }
1061                 }
1062                 else if((operation & SocketOperation.Read) != 0)
1063                 {
1064                     if(_observer != null && !_readHeader)
1065                     {
1066                         observerStartRead(_readStream.getBuffer());
1067                     }
1068 
1069                     completedSynchronously = _transceiver.startRead(_readStream.getBuffer(), cb, this);
1070                 }
1071             }
1072             catch(LocalException ex)
1073             {
1074                 setState(StateClosed, ex);
1075                 return false;
1076             }
1077             return true;
1078         }
1079 
finishAsync(int operation)1080         public override bool finishAsync(int operation)
1081         {
1082             try
1083             {
1084                 if((operation & SocketOperation.Write) != 0)
1085                 {
1086                     IceInternal.Buffer buf = _writeStream.getBuffer();
1087                     int start = buf.b.position();
1088                     _transceiver.finishWrite(buf);
1089                     if(_instance.traceLevels().network >= 3 && buf.b.position() != start)
1090                     {
1091                         StringBuilder s = new StringBuilder("sent ");
1092                         s.Append(buf.b.position() - start);
1093                         if(!_endpoint.datagram())
1094                         {
1095                             s.Append(" of ");
1096                             s.Append(buf.b.limit() - start);
1097                         }
1098                         s.Append(" bytes via ");
1099                         s.Append(_endpoint.protocol());
1100                         s.Append("\n");
1101                         s.Append(ToString());
1102                         _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
1103                     }
1104 
1105                     if(_observer != null)
1106                     {
1107                         observerFinishWrite(_writeStream.getBuffer());
1108                     }
1109                 }
1110                 else if((operation & SocketOperation.Read) != 0)
1111                 {
1112                     IceInternal.Buffer buf = _readStream.getBuffer();
1113                     int start = buf.b.position();
1114                     _transceiver.finishRead(buf);
1115                     if(_instance.traceLevels().network >= 3 && buf.b.position() != start)
1116                     {
1117                         StringBuilder s = new StringBuilder("received ");
1118                         if(_endpoint.datagram())
1119                         {
1120                             s.Append(buf.b.limit());
1121                         }
1122                         else
1123                         {
1124                             s.Append(buf.b.position() - start);
1125                             s.Append(" of ");
1126                             s.Append(buf.b.limit() - start);
1127                         }
1128                         s.Append(" bytes via ");
1129                         s.Append(_endpoint.protocol());
1130                         s.Append("\n");
1131                         s.Append(ToString());
1132                         _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
1133                     }
1134 
1135                     if(_observer != null && !_readHeader)
1136                     {
1137                         observerFinishRead(_readStream.getBuffer());
1138                     }
1139                 }
1140             }
1141             catch(LocalException ex)
1142             {
1143                 setState(StateClosed, ex);
1144             }
1145             return _state < StateClosed;
1146         }
1147 
message(ref ThreadPoolCurrent current)1148         public override void message(ref ThreadPoolCurrent current)
1149         {
1150             StartCallback startCB = null;
1151             Queue<OutgoingMessage> sentCBs = null;
1152             MessageInfo info = new MessageInfo();
1153             int dispatchCount = 0;
1154 
1155             ThreadPoolMessage msg = new ThreadPoolMessage(this);
1156             try
1157             {
1158                 lock(this)
1159                 {
1160                     if(!msg.startIOScope(ref current))
1161                     {
1162                         return;
1163                     }
1164 
1165                     if(_state >= StateClosed)
1166                     {
1167                         return;
1168                     }
1169 
1170                     int readyOp = current.operation;
1171                     try
1172                     {
1173                         unscheduleTimeout(current.operation);
1174 
1175                         int writeOp = SocketOperation.None;
1176                         int readOp = SocketOperation.None;
1177                         if((readyOp & SocketOperation.Write) != 0)
1178                         {
1179                             if(_observer != null)
1180                             {
1181                                 observerStartWrite(_writeStream.getBuffer());
1182                             }
1183                             writeOp = write(_writeStream.getBuffer());
1184                             if(_observer != null && (writeOp & SocketOperation.Write) == 0)
1185                             {
1186                                 observerFinishWrite(_writeStream.getBuffer());
1187                             }
1188                         }
1189 
1190                         while((readyOp & SocketOperation.Read) != 0)
1191                         {
1192                             IceInternal.Buffer buf = _readStream.getBuffer();
1193 
1194                             if(_observer != null && !_readHeader)
1195                             {
1196                                 observerStartRead(buf);
1197                             }
1198 
1199                             readOp = read(buf);
1200                             if((readOp & SocketOperation.Read) != 0)
1201                             {
1202                                 break;
1203                             }
1204                             if(_observer != null && !_readHeader)
1205                             {
1206                                 Debug.Assert(!buf.b.hasRemaining());
1207                                 observerFinishRead(buf);
1208                             }
1209 
1210                             if(_readHeader) // Read header if necessary.
1211                             {
1212                                 _readHeader = false;
1213 
1214                                 if(_observer != null)
1215                                 {
1216                                     _observer.receivedBytes(Protocol.headerSize);
1217                                 }
1218 
1219                                 int pos = _readStream.pos();
1220                                 if(pos < Protocol.headerSize)
1221                                 {
1222                                     //
1223                                     // This situation is possible for small UDP packets.
1224                                     //
1225                                     throw new IllegalMessageSizeException();
1226                                 }
1227 
1228                                 _readStream.pos(0);
1229                                 byte[] m = new byte[4];
1230                                 m[0] = _readStream.readByte();
1231                                 m[1] = _readStream.readByte();
1232                                 m[2] = _readStream.readByte();
1233                                 m[3] = _readStream.readByte();
1234                                 if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
1235                                    m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
1236                                 {
1237                                     BadMagicException ex = new BadMagicException();
1238                                     ex.badMagic = m;
1239                                     throw ex;
1240                                 }
1241 
1242                                 ProtocolVersion pv  = new ProtocolVersion();
1243                                 pv.ice_readMembers(_readStream);
1244                                 Protocol.checkSupportedProtocol(pv);
1245                                 EncodingVersion ev = new EncodingVersion();
1246                                 ev.ice_readMembers(_readStream);
1247                                 Protocol.checkSupportedProtocolEncoding(ev);
1248 
1249                                 _readStream.readByte(); // messageType
1250                                 _readStream.readByte(); // compress
1251                                 int size = _readStream.readInt();
1252                                 if(size < Protocol.headerSize)
1253                                 {
1254                                     throw new IllegalMessageSizeException();
1255                                 }
1256                                 if(size > _messageSizeMax)
1257                                 {
1258                                     Ex.throwMemoryLimitException(size, _messageSizeMax);
1259                                 }
1260                                 if(size > _readStream.size())
1261                                 {
1262                                     _readStream.resize(size);
1263                                 }
1264                                 _readStream.pos(pos);
1265                             }
1266 
1267                             if(buf.b.hasRemaining())
1268                             {
1269                                 if(_endpoint.datagram())
1270                                 {
1271                                     throw new DatagramLimitException(); // The message was truncated.
1272                                 }
1273                                 continue;
1274                             }
1275                             break;
1276                         }
1277 
1278                         int newOp = readOp | writeOp;
1279                         readyOp &= ~newOp;
1280                         Debug.Assert(readyOp != 0 || newOp != 0);
1281 
1282                         if(_state <= StateNotValidated)
1283                         {
1284                             if(newOp != 0)
1285                             {
1286                                 //
1287                                 // Wait for all the transceiver conditions to be
1288                                 // satisfied before continuing.
1289                                 //
1290                                 scheduleTimeout(newOp);
1291                                 _threadPool.update(this, current.operation, newOp);
1292                                 return;
1293                             }
1294 
1295                             if(_state == StateNotInitialized && !initialize(current.operation))
1296                             {
1297                                 return;
1298                             }
1299 
1300                             if(_state <= StateNotValidated && !validate(current.operation))
1301                             {
1302                                 return;
1303                             }
1304 
1305                             _threadPool.unregister(this, current.operation);
1306 
1307                             //
1308                             // We start out in holding state.
1309                             //
1310                             setState(StateHolding);
1311                             if(_startCallback != null)
1312                             {
1313                                 startCB = _startCallback;
1314                                 _startCallback = null;
1315                                 if(startCB != null)
1316                                 {
1317                                     ++dispatchCount;
1318                                 }
1319                             }
1320                         }
1321                         else
1322                         {
1323                             Debug.Assert(_state <= StateClosingPending);
1324 
1325                             //
1326                             // We parse messages first, if we receive a close
1327                             // connection message we won't send more messages.
1328                             //
1329                             if((readyOp & SocketOperation.Read) != 0)
1330                             {
1331                                 newOp |= parseMessage(ref info);
1332                                 dispatchCount += info.messageDispatchCount;
1333                             }
1334 
1335                             if((readyOp & SocketOperation.Write) != 0)
1336                             {
1337                                 newOp |= sendNextMessage(out sentCBs);
1338                                 if(sentCBs != null)
1339                                 {
1340                                     ++dispatchCount;
1341                                 }
1342                             }
1343 
1344                             if(_state < StateClosed)
1345                             {
1346                                 scheduleTimeout(newOp);
1347                                 _threadPool.update(this, current.operation, newOp);
1348                             }
1349                         }
1350 
1351                         if(_acmLastActivity > -1)
1352                         {
1353                             _acmLastActivity = Time.currentMonotonicTimeMillis();
1354                         }
1355 
1356                         if(dispatchCount == 0)
1357                         {
1358                             return; // Nothing to dispatch we're done!
1359                         }
1360 
1361                         _dispatchCount += dispatchCount;
1362 
1363                         msg.completed(ref current);
1364                     }
1365                     catch(DatagramLimitException) // Expected.
1366                     {
1367                         if(_warnUdp)
1368                         {
1369                             _logger.warning(string.Format("maximum datagram size of {0} exceeded", _readStream.pos()));
1370                         }
1371                         _readStream.resize(Protocol.headerSize);
1372                         _readStream.pos(0);
1373                         _readHeader = true;
1374                         return;
1375                     }
1376                     catch(SocketException ex)
1377                     {
1378                         setState(StateClosed, ex);
1379                         return;
1380                     }
1381                     catch(LocalException ex)
1382                     {
1383                         if(_endpoint.datagram())
1384                         {
1385                             if(_warn)
1386                             {
1387                                 _logger.warning(string.Format("datagram connection exception:\n{0}\n{1}", ex, _desc));
1388                             }
1389                             _readStream.resize(Protocol.headerSize);
1390                             _readStream.pos(0);
1391                             _readHeader = true;
1392                         }
1393                         else
1394                         {
1395                             setState(StateClosed, ex);
1396                         }
1397                         return;
1398                     }
1399 
1400                     ThreadPoolCurrent c = current;
1401                     _threadPool.dispatch(() =>
1402                     {
1403                         dispatch(startCB, sentCBs, info);
1404                         msg.destroy(ref c);
1405                     }, this);
1406                 }
1407             }
1408             finally
1409             {
1410                 msg.finishIOScope(ref current);
1411             }
1412 
1413         }
1414 
dispatch(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)1415         private void dispatch(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)
1416         {
1417             int dispatchedCount = 0;
1418 
1419             //
1420             // Notify the factory that the connection establishment and
1421             // validation has completed.
1422             //
1423             if(startCB != null)
1424             {
1425                 startCB.connectionStartCompleted(this);
1426                 ++dispatchedCount;
1427             }
1428 
1429             //
1430             // Notify AMI calls that the message was sent.
1431             //
1432             if(sentCBs != null)
1433             {
1434                 foreach(OutgoingMessage m in sentCBs)
1435                 {
1436                     if(m.invokeSent)
1437                     {
1438                         m.outAsync.invokeSent();
1439                     }
1440                     if(m.receivedReply)
1441                     {
1442                         OutgoingAsync outAsync = (OutgoingAsync)m.outAsync;
1443                         if(outAsync.response())
1444                         {
1445                             outAsync.invokeResponse();
1446                         }
1447                     }
1448                 }
1449                 ++dispatchedCount;
1450             }
1451 
1452             //
1453             // Asynchronous replies must be handled outside the thread
1454             // synchronization, so that nested calls are possible.
1455             //
1456             if(info.outAsync != null)
1457             {
1458                 info.outAsync.invokeResponse();
1459                 ++dispatchedCount;
1460             }
1461 
1462             if(info.heartbeatCallback != null)
1463             {
1464                 try
1465                 {
1466                     info.heartbeatCallback(this);
1467                 }
1468                 catch(System.Exception ex)
1469                 {
1470                     _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
1471                 }
1472                 ++dispatchedCount;
1473             }
1474 
1475             //
1476             // Method invocation (or multiple invocations for batch messages)
1477             // must be done outside the thread synchronization, so that nested
1478             // calls are possible.
1479             //
1480             if(info.invokeNum > 0)
1481             {
1482                 invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
1483                           info.adapter);
1484 
1485                 //
1486                 // Don't increase dispatchedCount, the dispatch count is
1487                 // decreased when the incoming reply is sent.
1488                 //
1489             }
1490 
1491             //
1492             // Decrease dispatch count.
1493             //
1494             if(dispatchedCount > 0)
1495             {
1496                 lock(this)
1497                 {
1498                     _dispatchCount -= dispatchedCount;
1499                     if(_dispatchCount == 0)
1500                     {
1501                         //
1502                         // Only initiate shutdown if not already done. It
1503                         // might have already been done if the sent callback
1504                         // or AMI callback was dispatched when the connection
1505                         // was already in the closing state.
1506                         //
1507                         if(_state == StateClosing)
1508                         {
1509                             try
1510                             {
1511                                 initiateShutdown();
1512                             }
1513                             catch(Ice.LocalException ex)
1514                             {
1515                                 setState(StateClosed, ex);
1516                             }
1517                         }
1518                         else if(_state == StateFinished)
1519                         {
1520                             reap();
1521                         }
1522                         Monitor.PulseAll(this);
1523                     }
1524                 }
1525             }
1526         }
1527 
finished(ref ThreadPoolCurrent current)1528         public override void finished(ref ThreadPoolCurrent current)
1529         {
1530             lock(this)
1531             {
1532                 Debug.Assert(_state == StateClosed);
1533                 unscheduleTimeout(SocketOperation.Read | SocketOperation.Write);
1534             }
1535 
1536             //
1537             // If there are no callbacks to call, we don't call ioCompleted() since we're not going
1538             // to call code that will potentially block (this avoids promoting a new leader and
1539             // unecessary thread creation, especially if this is called on shutdown).
1540             //
1541             if(_startCallback == null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 &&
1542                _closeCallback == null && _heartbeatCallback == null)
1543             {
1544                 finish();
1545                 return;
1546             }
1547 
1548             //
1549             // Unlike C++/Java, this method is called from an IO thread of the .NET thread
1550             // pool of from the communicator async IO thread. While it's fine to handle the
1551             // non-blocking activity of the connection from these threads, the dispatching
1552             // of the message must be taken care of by the Ice thread pool.
1553             //
1554             _threadPool.dispatch(finish, this);
1555         }
1556 
finish()1557         private void finish()
1558         {
1559             if(!_initialized)
1560             {
1561                 if(_instance.traceLevels().network >= 2)
1562                 {
1563                     StringBuilder s = new StringBuilder("failed to ");
1564                     s.Append(_connector != null ? "establish" : "accept");
1565                     s.Append(" ");
1566                     s.Append(_endpoint.protocol());
1567                     s.Append(" connection\n");
1568                     s.Append(ToString());
1569                     s.Append("\n");
1570                     s.Append(_exception);
1571                     _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
1572                 }
1573             }
1574             else
1575             {
1576                 if(_instance.traceLevels().network >= 1)
1577                 {
1578                     StringBuilder s = new StringBuilder("closed ");
1579                     s.Append(_endpoint.protocol());
1580                     s.Append(" connection\n");
1581                     s.Append(ToString());
1582 
1583                     //
1584                     // Trace the cause of unexpected connection closures
1585                     //
1586                     if(!(_exception is CloseConnectionException ||
1587                          _exception is ConnectionManuallyClosedException ||
1588                          _exception is ConnectionTimeoutException ||
1589                          _exception is CommunicatorDestroyedException ||
1590                          _exception is ObjectAdapterDeactivatedException))
1591                     {
1592                         s.Append("\n");
1593                         s.Append(_exception);
1594                     }
1595 
1596                     _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
1597                 }
1598             }
1599 
1600             if(_startCallback != null)
1601             {
1602                 _startCallback.connectionStartFailed(this, _exception);
1603                 _startCallback = null;
1604             }
1605 
1606             if(_sendStreams.Count > 0)
1607             {
1608                 if(!_writeStream.isEmpty())
1609                 {
1610                     //
1611                     // Return the stream to the outgoing call. This is important for
1612                     // retriable AMI calls which are not marshalled again.
1613                     //
1614                     OutgoingMessage message = _sendStreams.First.Value;
1615                     _writeStream.swap(message.stream);
1616 
1617                     //
1618                     // The current message might be sent but not yet removed from _sendStreams. If
1619                     // the response has been received in the meantime, we remove the message from
1620                     // _sendStreams to not call finished on a message which is already done.
1621                     //
1622                     if(message.isSent || message.receivedReply)
1623                     {
1624                         if(message.sent() && message.invokeSent)
1625                         {
1626                             message.outAsync.invokeSent();
1627                         }
1628                         if(message.receivedReply)
1629                         {
1630                             OutgoingAsync outAsync = (OutgoingAsync)message.outAsync;
1631                             if(outAsync.response())
1632                             {
1633                                 outAsync.invokeResponse();
1634                             }
1635                         }
1636                         _sendStreams.RemoveFirst();
1637                     }
1638                 }
1639 
1640                 foreach (OutgoingMessage o in _sendStreams)
1641                 {
1642                     o.completed(_exception);
1643                     if(o.requestId > 0) // Make sure finished isn't called twice.
1644                     {
1645                         _asyncRequests.Remove(o.requestId);
1646                     }
1647                 }
1648                 _sendStreams.Clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
1649             }
1650 
1651             foreach(OutgoingAsyncBase o in _asyncRequests.Values)
1652             {
1653                 if(o.exception(_exception))
1654                 {
1655                     o.invokeException();
1656                 }
1657             }
1658             _asyncRequests.Clear();
1659 
1660             //
1661             // Don't wait to be reaped to reclaim memory allocated by read/write streams.
1662             //
1663             _writeStream.clear();
1664             _writeStream.getBuffer().clear();
1665             _readStream.clear();
1666             _readStream.getBuffer().clear();
1667             _incomingCache = null;
1668 
1669             if(_closeCallback != null)
1670             {
1671                 try
1672                 {
1673                     _closeCallback(this);
1674                 }
1675                 catch(System.Exception ex)
1676                 {
1677                     _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
1678                 }
1679                 _closeCallback = null;
1680             }
1681 
1682             _heartbeatCallback = null;
1683 
1684             //
1685             // This must be done last as this will cause waitUntilFinished() to return (and communicator
1686             // objects such as the timer might be destroyed too).
1687             //
1688             lock(this)
1689             {
1690                 setState(StateFinished);
1691 
1692                 if(_dispatchCount == 0)
1693                 {
1694                     reap();
1695                 }
1696             }
1697         }
1698 
ToString()1699         public override string ToString()
1700         {
1701             return _desc; // No mutex lock, _desc is immutable.
1702         }
1703 
timedOut()1704         public void timedOut()
1705         {
1706             lock(this)
1707             {
1708                 if(_state <= StateNotValidated)
1709                 {
1710                     setState(StateClosed, new ConnectTimeoutException());
1711                 }
1712                 else if(_state < StateClosing)
1713                 {
1714                     setState(StateClosed, new TimeoutException());
1715                 }
1716                 else if(_state < StateClosed)
1717                 {
1718                     setState(StateClosed, new CloseTimeoutException());
1719                 }
1720             }
1721         }
1722 
type()1723         public string type()
1724         {
1725             return _type; // No mutex lock, _type is immutable.
1726         }
1727 
timeout()1728         public int timeout()
1729         {
1730             return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable.
1731         }
1732 
getInfo()1733         public ConnectionInfo getInfo()
1734         {
1735             lock(this)
1736             {
1737                 if(_state >= StateClosed)
1738                 {
1739                     throw _exception;
1740                 }
1741                 return initConnectionInfo();
1742             }
1743         }
1744 
setBufferSize(int rcvSize, int sndSize)1745         public void setBufferSize(int rcvSize, int sndSize)
1746         {
1747             lock(this)
1748             {
1749                 if(_state >= StateClosed)
1750                 {
1751                     throw _exception;
1752                 }
1753                 _transceiver.setBufferSize(rcvSize, sndSize);
1754                 _info = null; // Invalidate the cached connection info
1755             }
1756         }
1757 
ice_toString_()1758         public string ice_toString_()
1759         {
1760             return ToString();
1761         }
1762 
exception(LocalException ex)1763         public void exception(LocalException ex)
1764         {
1765             lock(this)
1766             {
1767                 setState(StateClosed, ex);
1768             }
1769         }
1770 
getThreadPool()1771         public IceInternal.ThreadPool getThreadPool()
1772         {
1773             return _threadPool;
1774         }
1775 
ConnectionI()1776         static ConnectionI()
1777         {
1778             _compressionSupported = IceInternal.BZip2.supported();
1779         }
1780 
ConnectionI(Communicator communicator, Instance instance, ACMMonitor monitor, Transceiver transceiver, Connector connector, EndpointI endpoint, ObjectAdapterI adapter)1781         internal ConnectionI(Communicator communicator, Instance instance, ACMMonitor monitor, Transceiver transceiver,
1782                              Connector connector, EndpointI endpoint, ObjectAdapterI adapter)
1783         {
1784             _communicator = communicator;
1785             _instance = instance;
1786             _monitor = monitor;
1787             _transceiver = transceiver;
1788             _desc = transceiver.ToString();
1789             _type = transceiver.protocol();
1790             _connector = connector;
1791             _endpoint = endpoint;
1792             _adapter = adapter;
1793             InitializationData initData = instance.initializationData();
1794             _logger = initData.logger; // Cached for better performance.
1795             _traceLevels = instance.traceLevels(); // Cached for better performance.
1796             _timer = instance.timer();
1797             _writeTimeout = new TimeoutCallback(this);
1798             _writeTimeoutScheduled = false;
1799             _readTimeout = new TimeoutCallback(this);
1800             _readTimeoutScheduled = false;
1801             _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0;
1802             _warnUdp = initData.properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
1803             _cacheBuffers = instance.cacheMessageBuffers() > 0;
1804             if(_monitor != null && _monitor.getACM().timeout > 0)
1805             {
1806                 _acmLastActivity = Time.currentMonotonicTimeMillis();
1807             }
1808             else
1809             {
1810                 _acmLastActivity = -1;
1811             }
1812             _nextRequestId = 1;
1813             _messageSizeMax = adapter != null ? adapter.messageSizeMax() : instance.messageSizeMax();
1814             _batchRequestQueue = new BatchRequestQueue(instance, _endpoint.datagram());
1815             _readStream = new InputStream(instance, Util.currentProtocolEncoding);
1816             _readHeader = false;
1817             _readStreamPos = -1;
1818             _writeStream = new OutputStream(instance, Util.currentProtocolEncoding);
1819             _writeStreamPos = -1;
1820             _dispatchCount = 0;
1821             _state = StateNotInitialized;
1822 
1823             _compressionLevel = initData.properties.getPropertyAsIntWithDefault("Ice.Compression.Level", 1);
1824             if(_compressionLevel < 1)
1825             {
1826                 _compressionLevel = 1;
1827             }
1828             else if(_compressionLevel > 9)
1829             {
1830                 _compressionLevel = 9;
1831             }
1832 
1833             if(adapter != null)
1834             {
1835                 _servantManager = adapter.getServantManager();
1836             }
1837 
1838             try
1839             {
1840                 if(adapter != null)
1841                 {
1842                     _threadPool = adapter.getThreadPool();
1843                 }
1844                 else
1845                 {
1846                     _threadPool = instance.clientThreadPool();
1847                 }
1848                 _threadPool.initialize(this);
1849             }
1850             catch(LocalException)
1851             {
1852                 throw;
1853             }
1854             catch(System.Exception ex)
1855             {
1856                 throw new SyscallException(ex);
1857             }
1858         }
1859 
1860         private const int StateNotInitialized = 0;
1861         private const int StateNotValidated = 1;
1862         private const int StateActive = 2;
1863         private const int StateHolding = 3;
1864         private const int StateClosing = 4;
1865         private const int StateClosingPending = 5;
1866         private const int StateClosed = 6;
1867         private const int StateFinished = 7;
1868 
setState(int state, LocalException ex)1869         private void setState(int state, LocalException ex)
1870         {
1871             //
1872             // If setState() is called with an exception, then only closed
1873             // and closing states are permissible.
1874             //
1875             Debug.Assert(state >= StateClosing);
1876 
1877             if(_state == state) // Don't switch twice.
1878             {
1879                 return;
1880             }
1881 
1882             if(_exception == null)
1883             {
1884                 //
1885                 // If we are in closed state, an exception must be set.
1886                 //
1887                 Debug.Assert(_state != StateClosed);
1888 
1889                 _exception = ex;
1890 
1891                 //
1892                 // We don't warn if we are not validated.
1893                 //
1894                 if(_warn && _validated)
1895                 {
1896                     //
1897                     // Don't warn about certain expected exceptions.
1898                     //
1899                     if(!(_exception is CloseConnectionException ||
1900                          _exception is ConnectionManuallyClosedException ||
1901                          _exception is ConnectionTimeoutException ||
1902                          _exception is CommunicatorDestroyedException ||
1903                          _exception is ObjectAdapterDeactivatedException ||
1904                          (_exception is ConnectionLostException && _state >= StateClosing)))
1905                     {
1906                         warning("connection exception", _exception);
1907                     }
1908                 }
1909             }
1910 
1911             //
1912             // We must set the new state before we notify requests of any
1913             // exceptions. Otherwise new requests may retry on a
1914             // connection that is not yet marked as closed or closing.
1915             //
1916             setState(state);
1917         }
1918 
setState(int state)1919         private void setState(int state)
1920         {
1921             //
1922             // We don't want to send close connection messages if the endpoint
1923             // only supports oneway transmission from client to server.
1924             //
1925             if(_endpoint.datagram() && state == StateClosing)
1926             {
1927                 state = StateClosed;
1928             }
1929 
1930             //
1931             // Skip graceful shutdown if we are destroyed before validation.
1932             //
1933             if(_state <= StateNotValidated && state == StateClosing)
1934             {
1935                 state = StateClosed;
1936             }
1937 
1938             if(_state == state) // Don't switch twice.
1939             {
1940                 return;
1941             }
1942 
1943             try
1944             {
1945                 switch(state)
1946                 {
1947                 case StateNotInitialized:
1948                 {
1949                     Debug.Assert(false);
1950                     break;
1951                 }
1952 
1953                 case StateNotValidated:
1954                 {
1955                     if(_state != StateNotInitialized)
1956                     {
1957                         Debug.Assert(_state == StateClosed);
1958                         return;
1959                     }
1960                     break;
1961                 }
1962 
1963                 case StateActive:
1964                 {
1965                     //
1966                     // Can only switch from holding or not validated to
1967                     // active.
1968                     //
1969                     if(_state != StateHolding && _state != StateNotValidated)
1970                     {
1971                         return;
1972                     }
1973                     _threadPool.register(this, SocketOperation.Read);
1974                     break;
1975                 }
1976 
1977                 case StateHolding:
1978                 {
1979                     //
1980                     // Can only switch from active or not validated to
1981                     // holding.
1982                     //
1983                     if(_state != StateActive && _state != StateNotValidated)
1984                     {
1985                         return;
1986                     }
1987                     if(_state == StateActive)
1988                     {
1989                         _threadPool.unregister(this, SocketOperation.Read);
1990                     }
1991                     break;
1992                 }
1993 
1994                 case StateClosing:
1995                 case StateClosingPending:
1996                 {
1997                     //
1998                     // Can't change back from closing pending.
1999                     //
2000                     if(_state >= StateClosingPending)
2001                     {
2002                         return;
2003                     }
2004                     break;
2005                 }
2006 
2007                 case StateClosed:
2008                 {
2009                     if(_state == StateFinished)
2010                     {
2011                         return;
2012                     }
2013 
2014                     _batchRequestQueue.destroy(_exception);
2015                     _threadPool.finish(this);
2016                     _transceiver.close();
2017                     break;
2018                 }
2019 
2020                 case StateFinished:
2021                 {
2022                     Debug.Assert(_state == StateClosed);
2023                     _transceiver.destroy();
2024                     _communicator = null;
2025                     break;
2026                 }
2027                 }
2028             }
2029             catch(LocalException ex)
2030             {
2031                 _logger.error("unexpected connection exception:\n" + ex + "\n" + _transceiver.ToString());
2032             }
2033 
2034             //
2035             // We only register with the connection monitor if our new state
2036             // is StateActive. Otherwise we unregister with the connection
2037             // monitor, but only if we were registered before, i.e., if our
2038             // old state was StateActive.
2039             //
2040             if(_monitor != null)
2041             {
2042                 if(state == StateActive)
2043                 {
2044                     if(_acmLastActivity > -1)
2045                     {
2046                         _acmLastActivity = Time.currentMonotonicTimeMillis();
2047                     }
2048                     _monitor.add(this);
2049                 }
2050                 else if(_state == StateActive)
2051                 {
2052                     _monitor.remove(this);
2053                 }
2054             }
2055 
2056             if(_instance.initializationData().observer != null)
2057             {
2058                 ConnectionState oldState = toConnectionState(_state);
2059                 ConnectionState newState = toConnectionState(state);
2060                 if(oldState != newState)
2061                 {
2062                     _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
2063                                                                                               _endpoint,
2064                                                                                               newState,
2065                                                                                               _observer);
2066                     if(_observer != null)
2067                     {
2068                         _observer.attach();
2069                     }
2070                     else
2071                     {
2072                         _writeStreamPos = -1;
2073                         _readStreamPos = -1;
2074                     }
2075                 }
2076                 if(_observer != null && state == StateClosed && _exception != null)
2077                 {
2078                     if(!(_exception is CloseConnectionException ||
2079                          _exception is ConnectionManuallyClosedException ||
2080                          _exception is ConnectionTimeoutException ||
2081                          _exception is CommunicatorDestroyedException ||
2082                          _exception is ObjectAdapterDeactivatedException ||
2083                          (_exception is ConnectionLostException && _state >= StateClosing)))
2084                     {
2085                         _observer.failed(_exception.ice_id());
2086                     }
2087                 }
2088             }
2089             _state = state;
2090 
2091             Monitor.PulseAll(this);
2092 
2093             if(_state == StateClosing && _dispatchCount == 0)
2094             {
2095                 try
2096                 {
2097                     initiateShutdown();
2098                 }
2099                 catch(LocalException ex)
2100                 {
2101                     setState(StateClosed, ex);
2102                 }
2103             }
2104         }
2105 
initiateShutdown()2106         private void initiateShutdown()
2107         {
2108             Debug.Assert(_state == StateClosing && _dispatchCount == 0);
2109 
2110             if(_shutdownInitiated)
2111             {
2112                 return;
2113             }
2114             _shutdownInitiated = true;
2115 
2116             if(!_endpoint.datagram())
2117             {
2118                 //
2119                 // Before we shut down, we send a close connection message.
2120                 //
2121                 OutputStream os = new OutputStream(_instance, Util.currentProtocolEncoding);
2122                 os.writeBlob(Protocol.magic);
2123                 Util.currentProtocol.ice_writeMembers(os);
2124                 Util.currentProtocolEncoding.ice_writeMembers(os);
2125                 os.writeByte(Protocol.closeConnectionMsg);
2126                 os.writeByte(_compressionSupported ? (byte)1 : (byte)0);
2127                 os.writeInt(Protocol.headerSize); // Message size.
2128 
2129                 if((sendMessage(new OutgoingMessage(os, false, false)) & OutgoingAsyncBase.AsyncStatusSent) != 0)
2130                 {
2131                     setState(StateClosingPending);
2132 
2133                     //
2134                     // Notify the transceiver of the graceful connection closure.
2135                     //
2136                     int op = _transceiver.closing(true, _exception);
2137                     if(op != 0)
2138                     {
2139                         scheduleTimeout(op);
2140                         _threadPool.register(this, op);
2141                     }
2142                 }
2143             }
2144         }
2145 
sendHeartbeatNow()2146         private void sendHeartbeatNow()
2147         {
2148             Debug.Assert(_state == StateActive);
2149 
2150             if(!_endpoint.datagram())
2151             {
2152                 OutputStream os = new OutputStream(_instance, Util.currentProtocolEncoding);
2153                 os.writeBlob(Protocol.magic);
2154                 Util.currentProtocol.ice_writeMembers(os);
2155                 Util.currentProtocolEncoding.ice_writeMembers(os);
2156                 os.writeByte(Protocol.validateConnectionMsg);
2157                 os.writeByte(0);
2158                 os.writeInt(Protocol.headerSize); // Message size.
2159                 try
2160                 {
2161                     sendMessage(new OutgoingMessage(os, false, false));
2162                 }
2163                 catch(LocalException ex)
2164                 {
2165                     setState(StateClosed, ex);
2166                     Debug.Assert(_exception != null);
2167                 }
2168             }
2169         }
2170 
initialize(int operation)2171         private bool initialize(int operation)
2172         {
2173             int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), ref _hasMoreData);
2174             if(s != SocketOperation.None)
2175             {
2176                 scheduleTimeout(s);
2177                 _threadPool.update(this, operation, s);
2178                 return false;
2179             }
2180 
2181             //
2182             // Update the connection description once the transceiver is initialized.
2183             //
2184             _desc = _transceiver.ToString();
2185             _initialized = true;
2186             setState(StateNotValidated);
2187 
2188             return true;
2189         }
2190 
validate(int operation)2191         private bool validate(int operation)
2192         {
2193             if(!_endpoint.datagram()) // Datagram connections are always implicitly validated.
2194             {
2195                 if(_adapter != null) // The server side has the active role for connection validation.
2196                 {
2197                     if(_writeStream.size() == 0)
2198                     {
2199                         _writeStream.writeBlob(Protocol.magic);
2200                         Util.currentProtocol.ice_writeMembers(_writeStream);
2201                         Util.currentProtocolEncoding.ice_writeMembers(_writeStream);
2202                         _writeStream.writeByte(Protocol.validateConnectionMsg);
2203                         _writeStream.writeByte(0); // Compression status (always zero for validate connection).
2204                         _writeStream.writeInt(Protocol.headerSize); // Message size.
2205                         TraceUtil.traceSend(_writeStream, _logger, _traceLevels);
2206                         _writeStream.prepareWrite();
2207                     }
2208 
2209                     if(_observer != null)
2210                     {
2211                         observerStartWrite(_writeStream.getBuffer());
2212                     }
2213 
2214                     if(_writeStream.pos() != _writeStream.size())
2215                     {
2216                         int op = write(_writeStream.getBuffer());
2217                         if(op != 0)
2218                         {
2219                             scheduleTimeout(op);
2220                             _threadPool.update(this, operation, op);
2221                             return false;
2222                         }
2223                     }
2224 
2225                     if(_observer != null)
2226                     {
2227                         observerFinishWrite(_writeStream.getBuffer());
2228                     }
2229                 }
2230                 else // The client side has the passive role for connection validation.
2231                 {
2232                     if(_readStream.size() == 0)
2233                     {
2234                         _readStream.resize(Protocol.headerSize);
2235                         _readStream.pos(0);
2236                     }
2237 
2238                     if(_observer != null)
2239                     {
2240                         observerStartRead(_readStream.getBuffer());
2241                     }
2242 
2243                     if(_readStream.pos() != _readStream.size())
2244                     {
2245                         int op = read(_readStream.getBuffer());
2246                         if(op != 0)
2247                         {
2248                             scheduleTimeout(op);
2249                             _threadPool.update(this, operation, op);
2250                             return false;
2251                         }
2252                     }
2253 
2254                     if(_observer != null)
2255                     {
2256                         observerFinishRead(_readStream.getBuffer());
2257                     }
2258 
2259                     Debug.Assert(_readStream.pos() == Protocol.headerSize);
2260                     _readStream.pos(0);
2261                     byte[] m = _readStream.readBlob(4);
2262                     if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
2263                        m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
2264                     {
2265                         BadMagicException ex = new BadMagicException();
2266                         ex.badMagic = m;
2267                         throw ex;
2268                     }
2269 
2270                     ProtocolVersion pv  = new ProtocolVersion();
2271                     pv.ice_readMembers(_readStream);
2272                     Protocol.checkSupportedProtocol(pv);
2273 
2274                     EncodingVersion ev = new EncodingVersion();
2275                     ev.ice_readMembers(_readStream);
2276                     Protocol.checkSupportedProtocolEncoding(ev);
2277 
2278                     byte messageType = _readStream.readByte();
2279                     if(messageType != Protocol.validateConnectionMsg)
2280                     {
2281                         throw new ConnectionNotValidatedException();
2282                     }
2283                     _readStream.readByte(); // Ignore compression status for validate connection.
2284                     int size = _readStream.readInt();
2285                     if(size != Protocol.headerSize)
2286                     {
2287                         throw new IllegalMessageSizeException();
2288                     }
2289                     TraceUtil.traceRecv(_readStream, _logger, _traceLevels);
2290 
2291                     _validated = true;
2292                 }
2293             }
2294 
2295             _writeStream.resize(0);
2296             _writeStream.pos(0);
2297 
2298             _readStream.resize(Protocol.headerSize);
2299             _readStream.pos(0);
2300             _readHeader = true;
2301 
2302             if(_instance.traceLevels().network >= 1)
2303             {
2304                 StringBuilder s = new StringBuilder();
2305                 if(_endpoint.datagram())
2306                 {
2307                     s.Append("starting to ");
2308                     s.Append(_connector != null ? "send" : "receive");
2309                     s.Append(" ");
2310                     s.Append(_endpoint.protocol());
2311                     s.Append(" messages\n");
2312                     s.Append(_transceiver.toDetailedString());
2313                 }
2314                 else
2315                 {
2316                     s.Append(_connector != null ? "established" : "accepted");
2317                     s.Append(" ");
2318                     s.Append(_endpoint.protocol());
2319                     s.Append(" connection\n");
2320                     s.Append(ToString());
2321                 }
2322                 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
2323             }
2324 
2325             return true;
2326         }
2327 
sendNextMessage(out Queue<OutgoingMessage> callbacks)2328         private int sendNextMessage(out Queue<OutgoingMessage> callbacks)
2329         {
2330             callbacks = null;
2331 
2332             if(_sendStreams.Count == 0)
2333             {
2334                 return SocketOperation.None;
2335             }
2336             else if(_state == StateClosingPending && _writeStream.pos() == 0)
2337             {
2338                 // Message wasn't sent, empty the _writeStream, we're not going to send more data.
2339                 OutgoingMessage message = _sendStreams.First.Value;
2340                 _writeStream.swap(message.stream);
2341                 return SocketOperation.None;
2342             }
2343 
2344             Debug.Assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
2345             try
2346             {
2347                 while(true)
2348                 {
2349                     //
2350                     // Notify the message that it was sent.
2351                     //
2352                     OutgoingMessage message = _sendStreams.First.Value;
2353                     _writeStream.swap(message.stream);
2354                     if(message.sent())
2355                     {
2356                         if(callbacks == null)
2357                         {
2358                             callbacks = new Queue<OutgoingMessage>();
2359                         }
2360                         callbacks.Enqueue(message);
2361                     }
2362                     _sendStreams.RemoveFirst();
2363 
2364                     //
2365                     // If there's nothing left to send, we're done.
2366                     //
2367                     if(_sendStreams.Count == 0)
2368                     {
2369                         break;
2370                     }
2371 
2372                     //
2373                     // If we are in the closed state or if the close is
2374                     // pending, don't continue sending.
2375                     //
2376                     // This can occur if parseMessage (called before
2377                     // sendNextMessage by message()) closes the connection.
2378                     //
2379                     if(_state >= StateClosingPending)
2380                     {
2381                         return SocketOperation.None;
2382                     }
2383 
2384                     //
2385                     // Otherwise, prepare the next message stream for writing.
2386                     //
2387                     message = _sendStreams.First.Value;
2388                     Debug.Assert(!message.prepared);
2389                     OutputStream stream = message.stream;
2390 
2391                     message.stream = doCompress(message.stream, message.compress);
2392                     message.stream.prepareWrite();
2393                     message.prepared = true;
2394 
2395                     TraceUtil.traceSend(stream, _logger, _traceLevels);
2396                     _writeStream.swap(message.stream);
2397 
2398                     //
2399                     // Send the message.
2400                     //
2401                     if(_observer != null)
2402                     {
2403                         observerStartWrite(_writeStream.getBuffer());
2404                     }
2405                     if(_writeStream.pos() != _writeStream.size())
2406                     {
2407                         int op = write(_writeStream.getBuffer());
2408                         if(op != 0)
2409                         {
2410                             return op;
2411                         }
2412                     }
2413                     if(_observer != null)
2414                     {
2415                         observerFinishWrite(_writeStream.getBuffer());
2416                     }
2417                 }
2418 
2419                 //
2420                 // If all the messages were sent and we are in the closing state, we schedule
2421                 // the close timeout to wait for the peer to close the connection.
2422                 //
2423                 if(_state == StateClosing && _shutdownInitiated)
2424                 {
2425                     setState(StateClosingPending);
2426                     int op = _transceiver.closing(true, _exception);
2427                     if(op != 0)
2428                     {
2429                         return op;
2430                     }
2431                 }
2432             }
2433             catch(LocalException ex)
2434             {
2435                 setState(StateClosed, ex);
2436             }
2437             return SocketOperation.None;
2438         }
2439 
sendMessage(OutgoingMessage message)2440         private int sendMessage(OutgoingMessage message)
2441         {
2442             Debug.Assert(_state < StateClosed);
2443 
2444             if(_sendStreams.Count > 0)
2445             {
2446                 message.adopt();
2447                 _sendStreams.AddLast(message);
2448                 return OutgoingAsyncBase.AsyncStatusQueued;
2449             }
2450 
2451             //
2452             // Attempt to send the message without blocking. If the send blocks, we use
2453             // asynchronous I/O or we request the caller to call finishSendMessage() outside
2454             // the synchronization.
2455             //
2456 
2457             Debug.Assert(!message.prepared);
2458 
2459             OutputStream stream = message.stream;
2460 
2461             message.stream = doCompress(stream, message.compress);
2462             message.stream.prepareWrite();
2463             message.prepared = true;
2464 
2465             TraceUtil.traceSend(stream, _logger, _traceLevels);
2466 
2467             //
2468             // Send the message without blocking.
2469             //
2470             if(_observer != null)
2471             {
2472                 observerStartWrite(message.stream.getBuffer());
2473             }
2474             int op = write(message.stream.getBuffer());
2475             if(op == 0)
2476             {
2477                 if(_observer != null)
2478                 {
2479                     observerFinishWrite(message.stream.getBuffer());
2480                 }
2481 
2482                 int status = OutgoingAsyncBase.AsyncStatusSent;
2483                 if(message.sent())
2484                 {
2485                     status = status | OutgoingAsyncBase.AsyncStatusInvokeSentCallback;
2486                 }
2487 
2488                 if(_acmLastActivity > -1)
2489                 {
2490                     _acmLastActivity = Time.currentMonotonicTimeMillis();
2491                 }
2492                 return status;
2493             }
2494 
2495             message.adopt();
2496 
2497             _writeStream.swap(message.stream);
2498             _sendStreams.AddLast(message);
2499             scheduleTimeout(op);
2500             _threadPool.register(this, op);
2501             return OutgoingAsyncBase.AsyncStatusQueued;
2502         }
2503 
doCompress(OutputStream uncompressed, bool compress)2504         private OutputStream doCompress(OutputStream uncompressed, bool compress)
2505         {
2506             if(_compressionSupported)
2507             {
2508                 if(compress && uncompressed.size() >= 100)
2509                 {
2510                     //
2511                     // Do compression.
2512                     //
2513                     IceInternal.Buffer cbuf = BZip2.compress(uncompressed.getBuffer(), Protocol.headerSize,
2514                                                              _compressionLevel);
2515                     if(cbuf != null)
2516                     {
2517                         OutputStream cstream =
2518                             new OutputStream(uncompressed.instance(), uncompressed.getEncoding(), cbuf, true);
2519 
2520                         //
2521                         // Set compression status.
2522                         //
2523                         cstream.pos(9);
2524                         cstream.writeByte(2);
2525 
2526                         //
2527                         // Write the size of the compressed stream into the header.
2528                         //
2529                         cstream.pos(10);
2530                         cstream.writeInt(cstream.size());
2531 
2532                         //
2533                         // Write the compression status and size of the compressed stream into the header of the
2534                         // uncompressed stream -- we need this to trace requests correctly.
2535                         //
2536                         uncompressed.pos(9);
2537                         uncompressed.writeByte(2);
2538                         uncompressed.writeInt(cstream.size());
2539 
2540                         return cstream;
2541                     }
2542                 }
2543             }
2544 
2545             uncompressed.pos(9);
2546             uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0));
2547 
2548             //
2549             // Not compressed, fill in the message size.
2550             //
2551             uncompressed.pos(10);
2552             uncompressed.writeInt(uncompressed.size());
2553 
2554             return uncompressed;
2555         }
2556 
2557         private struct MessageInfo
2558         {
2559             public InputStream stream;
2560             public int invokeNum;
2561             public int requestId;
2562             public byte compress;
2563             public ServantManager servantManager;
2564             public ObjectAdapter adapter;
2565             public OutgoingAsyncBase outAsync;
2566             public HeartbeatCallback heartbeatCallback;
2567             public int messageDispatchCount;
2568         }
2569 
parseMessage(ref MessageInfo info)2570         private int parseMessage(ref MessageInfo info)
2571         {
2572             Debug.Assert(_state > StateNotValidated && _state < StateClosed);
2573 
2574             info.stream = new InputStream(_instance, Util.currentProtocolEncoding);
2575             _readStream.swap(info.stream);
2576             _readStream.resize(Protocol.headerSize);
2577             _readStream.pos(0);
2578             _readHeader = true;
2579 
2580             Debug.Assert(info.stream.pos() == info.stream.size());
2581 
2582             //
2583             // Connection is validated on first message. This is only used by
2584             // setState() to check wether or not we can print a connection
2585             // warning (a client might close the connection forcefully if the
2586             // connection isn't validated).
2587             //
2588             _validated = true;
2589 
2590             try
2591             {
2592                 //
2593                 // The magic and version fields have already been checked.
2594                 //
2595                 info.stream.pos(8);
2596                 byte messageType = info.stream.readByte();
2597                 info.compress = info.stream.readByte();
2598                 if(info.compress == 2)
2599                 {
2600                     if(_compressionSupported)
2601                     {
2602                         IceInternal.Buffer ubuf = BZip2.uncompress(info.stream.getBuffer(), Protocol.headerSize,
2603                                                                    _messageSizeMax);
2604                         info.stream = new InputStream(info.stream.instance(), info.stream.getEncoding(), ubuf, true);
2605                     }
2606                     else
2607                     {
2608                         string lib = AssemblyUtil.isWindows ? "bzip2.dll" : "libbz2.so.1";
2609                         FeatureNotSupportedException ex = new FeatureNotSupportedException();
2610                         ex.unsupportedFeature = "Cannot uncompress compressed message: " + lib + " not found";
2611                         throw ex;
2612                     }
2613                 }
2614                 info.stream.pos(Protocol.headerSize);
2615 
2616                 switch(messageType)
2617                 {
2618                     case Protocol.closeConnectionMsg:
2619                     {
2620                         TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
2621                         if(_endpoint.datagram())
2622                         {
2623                             if(_warn)
2624                             {
2625                                 _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
2626                             }
2627                         }
2628                         else
2629                         {
2630                             setState(StateClosingPending, new CloseConnectionException());
2631 
2632                             //
2633                             // Notify the transceiver of the graceful connection closure.
2634                             //
2635                             int op = _transceiver.closing(false, _exception);
2636                             if(op != 0)
2637                             {
2638                                 return op;
2639                             }
2640                             setState(StateClosed);
2641                         }
2642                         break;
2643                     }
2644 
2645                     case Protocol.requestMsg:
2646                     {
2647                         if(_state >= StateClosing)
2648                         {
2649                             TraceUtil.trace("received request during closing\n" +
2650                                             "(ignored by server, client will retry)", info.stream, _logger,
2651                                             _traceLevels);
2652                         }
2653                         else
2654                         {
2655                             TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
2656                             info.requestId = info.stream.readInt();
2657                             info.invokeNum = 1;
2658                             info.servantManager = _servantManager;
2659                             info.adapter = _adapter;
2660                             ++info.messageDispatchCount;
2661                         }
2662                         break;
2663                     }
2664 
2665                     case Protocol.requestBatchMsg:
2666                     {
2667                         if(_state >= StateClosing)
2668                         {
2669                             TraceUtil.trace("received batch request during closing\n" +
2670                                             "(ignored by server, client will retry)", info.stream, _logger,
2671                                             _traceLevels);
2672                         }
2673                         else
2674                         {
2675                             TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
2676                             info.invokeNum = info.stream.readInt();
2677                             if(info.invokeNum < 0)
2678                             {
2679                                 info.invokeNum = 0;
2680                                 throw new UnmarshalOutOfBoundsException();
2681                             }
2682                             info.servantManager = _servantManager;
2683                             info.adapter = _adapter;
2684                             info.messageDispatchCount += info.invokeNum;
2685                         }
2686                         break;
2687                     }
2688 
2689                     case Protocol.replyMsg:
2690                     {
2691                         TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
2692                         info.requestId = info.stream.readInt();
2693                         if(_asyncRequests.TryGetValue(info.requestId, out info.outAsync))
2694                         {
2695                             _asyncRequests.Remove(info.requestId);
2696 
2697                             info.outAsync.getIs().swap(info.stream);
2698 
2699                             //
2700                             // If we just received the reply for a request which isn't acknowledge as
2701                             // sent yet, we queue the reply instead of processing it right away. It
2702                             // will be processed once the write callback is invoked for the message.
2703                             //
2704                             OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null;
2705                             if(message != null && message.outAsync == info.outAsync)
2706                             {
2707                                 message.receivedReply = true;
2708                             }
2709                             else if(info.outAsync.response())
2710                             {
2711                                 ++info.messageDispatchCount;
2712                             }
2713                             else
2714                             {
2715                                 info.outAsync = null;
2716                             }
2717                             Monitor.PulseAll(this); // Notify threads blocked in close()
2718                         }
2719                         break;
2720                     }
2721 
2722                     case Protocol.validateConnectionMsg:
2723                     {
2724                         TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
2725                         if(_heartbeatCallback != null)
2726                         {
2727                             info.heartbeatCallback = _heartbeatCallback;
2728                             ++info.messageDispatchCount;
2729                         }
2730                         break;
2731                     }
2732 
2733                     default:
2734                     {
2735                         TraceUtil.trace("received unknown message\n(invalid, closing connection)",
2736                                         info.stream, _logger, _traceLevels);
2737                         throw new UnknownMessageException();
2738                     }
2739                 }
2740             }
2741             catch(LocalException ex)
2742             {
2743                 if(_endpoint.datagram())
2744                 {
2745                     if(_warn)
2746                     {
2747                         _logger.warning("datagram connection exception:\n" + ex.ToString() + "\n" + _desc);
2748                     }
2749                 }
2750                 else
2751                 {
2752                     setState(StateClosed, ex);
2753                 }
2754             }
2755 
2756             return _state == StateHolding ? SocketOperation.None : SocketOperation.Read;
2757         }
2758 
invokeAll(InputStream stream, int invokeNum, int requestId, byte compress, ServantManager servantManager, ObjectAdapter adapter)2759         private void invokeAll(InputStream stream, int invokeNum, int requestId, byte compress,
2760                                ServantManager servantManager, ObjectAdapter adapter)
2761         {
2762             //
2763             // Note: In contrast to other private or protected methods, this
2764             // operation must be called *without* the mutex locked.
2765             //
2766 
2767             Incoming inc = null;
2768             try
2769             {
2770                 while(invokeNum > 0)
2771                 {
2772                     //
2773                     // Prepare the invocation.
2774                     //
2775                     bool response = !_endpoint.datagram() && requestId != 0;
2776                     Debug.Assert(!response || invokeNum == 1);
2777 
2778                     inc = getIncoming(adapter, response, compress, requestId);
2779 
2780                     //
2781                     // Dispatch the invocation.
2782                     //
2783                     inc.invoke(servantManager, stream);
2784 
2785                     --invokeNum;
2786 
2787                     reclaimIncoming(inc);
2788                     inc = null;
2789                 }
2790 
2791                 stream.clear();
2792             }
2793             catch(LocalException ex)
2794             {
2795                 invokeException(requestId, ex, invokeNum, false);
2796             }
2797             finally
2798             {
2799                 if(inc != null)
2800                 {
2801                     reclaimIncoming(inc);
2802                 }
2803             }
2804         }
2805 
scheduleTimeout(int status)2806         private void scheduleTimeout(int status)
2807         {
2808             int timeout;
2809             if(_state < StateActive)
2810             {
2811                 DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
2812                 if(defaultsAndOverrides.overrideConnectTimeout)
2813                 {
2814                     timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
2815                 }
2816                 else
2817                 {
2818                     timeout = _endpoint.timeout();
2819                 }
2820             }
2821             else if(_state < StateClosingPending)
2822             {
2823                 if(_readHeader) // No timeout for reading the header.
2824                 {
2825                     status &= ~SocketOperation.Read;
2826                 }
2827                 timeout = _endpoint.timeout();
2828             }
2829             else
2830             {
2831                 DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
2832                 if(defaultsAndOverrides.overrideCloseTimeout)
2833                 {
2834                     timeout = defaultsAndOverrides.overrideCloseTimeoutValue;
2835                 }
2836                 else
2837                 {
2838                     timeout = _endpoint.timeout();
2839                 }
2840             }
2841 
2842             if(timeout < 0)
2843             {
2844                 return;
2845             }
2846 
2847             if((status & SocketOperation.Read) != 0)
2848             {
2849                 if(_readTimeoutScheduled)
2850                 {
2851                     _timer.cancel(_readTimeout);
2852                 }
2853                 _timer.schedule(_readTimeout, timeout);
2854                 _readTimeoutScheduled = true;
2855             }
2856             if((status & (SocketOperation.Write | SocketOperation.Connect)) != 0)
2857             {
2858                 if(_writeTimeoutScheduled)
2859                 {
2860                     _timer.cancel(_writeTimeout);
2861                 }
2862                 _timer.schedule(_writeTimeout, timeout);
2863                 _writeTimeoutScheduled = true;
2864             }
2865         }
2866 
unscheduleTimeout(int status)2867         private void unscheduleTimeout(int status)
2868         {
2869             if((status & SocketOperation.Read) != 0 && _readTimeoutScheduled)
2870             {
2871                 _timer.cancel(_readTimeout);
2872                 _readTimeoutScheduled = false;
2873             }
2874             if((status & (SocketOperation.Write | SocketOperation.Connect)) != 0 &&
2875                _writeTimeoutScheduled)
2876             {
2877                 _timer.cancel(_writeTimeout);
2878                 _writeTimeoutScheduled = false;
2879             }
2880         }
2881 
initConnectionInfo()2882         private ConnectionInfo initConnectionInfo()
2883         {
2884             if(_state > StateNotInitialized && _info != null) // Update the connection info until it's initialized
2885             {
2886                 return _info;
2887             }
2888 
2889             try
2890             {
2891                 _info = _transceiver.getInfo();
2892             }
2893             catch(LocalException)
2894             {
2895                 _info = new ConnectionInfo();
2896             }
2897             for(ConnectionInfo info = _info; info != null; info = info.underlying)
2898             {
2899                 info.connectionId = _endpoint.connectionId();
2900                 info.adapterName = _adapter != null ? _adapter.getName() : "";
2901                 info.incoming = _connector == null;
2902             }
2903             return _info;
2904         }
2905 
reap()2906         private void reap()
2907         {
2908             if(_monitor != null)
2909             {
2910                 _monitor.reap(this);
2911             }
2912             if(_observer != null)
2913             {
2914                 _observer.detach();
2915             }
2916         }
2917 
toConnectionState(int state)2918         ConnectionState toConnectionState(int state)
2919         {
2920             return connectionStateMap[state];
2921         }
2922 
warning(string msg, System.Exception ex)2923         private void warning(string msg, System.Exception ex)
2924         {
2925             _logger.warning(msg + ":\n" + ex + "\n" + _transceiver.ToString());
2926         }
2927 
observerStartRead(IceInternal.Buffer buf)2928         private void observerStartRead(IceInternal.Buffer buf)
2929         {
2930             if(_readStreamPos >= 0)
2931             {
2932                 Debug.Assert(!buf.empty());
2933                 _observer.receivedBytes(buf.b.position() - _readStreamPos);
2934             }
2935             _readStreamPos = buf.empty() ? -1 : buf.b.position();
2936         }
2937 
observerFinishRead(IceInternal.Buffer buf)2938         private void observerFinishRead(IceInternal.Buffer buf)
2939         {
2940             if(_readStreamPos == -1)
2941             {
2942                 return;
2943             }
2944             Debug.Assert(buf.b.position() >= _readStreamPos);
2945             _observer.receivedBytes(buf.b.position() - _readStreamPos);
2946             _readStreamPos = -1;
2947         }
2948 
observerStartWrite(IceInternal.Buffer buf)2949         private void observerStartWrite(IceInternal.Buffer buf)
2950         {
2951             if(_writeStreamPos >= 0)
2952             {
2953                 Debug.Assert(!buf.empty());
2954                 _observer.sentBytes(buf.b.position() - _writeStreamPos);
2955             }
2956             _writeStreamPos = buf.empty() ? -1 : buf.b.position();
2957         }
2958 
observerFinishWrite(IceInternal.Buffer buf)2959         private void observerFinishWrite(IceInternal.Buffer buf)
2960         {
2961             if(_writeStreamPos == -1)
2962             {
2963                 return;
2964             }
2965             if(buf.b.position() > _writeStreamPos)
2966             {
2967                 _observer.sentBytes(buf.b.position() - _writeStreamPos);
2968             }
2969             _writeStreamPos = -1;
2970         }
2971 
getIncoming(ObjectAdapter adapter, bool response, byte compress, int requestId)2972         private Incoming getIncoming(ObjectAdapter adapter, bool response, byte compress, int requestId)
2973         {
2974             Incoming inc = null;
2975 
2976             if(_cacheBuffers)
2977             {
2978                 lock(_incomingCacheMutex)
2979                 {
2980                     if(_incomingCache == null)
2981                     {
2982                         inc = new Incoming(_instance, this, this, adapter, response, compress, requestId);
2983                     }
2984                     else
2985                     {
2986                         inc = _incomingCache;
2987                         _incomingCache = _incomingCache.next;
2988                         inc.reset(_instance, this, this, adapter, response, compress, requestId);
2989                         inc.next = null;
2990                     }
2991                 }
2992             }
2993             else
2994             {
2995                 inc = new Incoming(_instance, this, this, adapter, response, compress, requestId);
2996             }
2997 
2998             return inc;
2999         }
3000 
reclaimIncoming(Incoming inc)3001         internal void reclaimIncoming(Incoming inc)
3002         {
3003             if(_cacheBuffers && inc.reclaim())
3004             {
3005                 lock(_incomingCacheMutex)
3006                 {
3007                     inc.next = _incomingCache;
3008                     _incomingCache = inc;
3009                 }
3010             }
3011         }
3012 
read(IceInternal.Buffer buf)3013         private int read(IceInternal.Buffer buf)
3014         {
3015             int start = buf.b.position();
3016             int op = _transceiver.read(buf, ref _hasMoreData);
3017             if(_instance.traceLevels().network >= 3 && buf.b.position() != start)
3018             {
3019                 StringBuilder s = new StringBuilder("received ");
3020                 if(_endpoint.datagram())
3021                 {
3022                     s.Append(buf.b.limit());
3023                 }
3024                 else
3025                 {
3026                     s.Append(buf.b.position() - start);
3027                     s.Append(" of ");
3028                     s.Append(buf.b.limit() - start);
3029                 }
3030                 s.Append(" bytes via ");
3031                 s.Append(_endpoint.protocol());
3032                 s.Append("\n");
3033                 s.Append(ToString());
3034                 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
3035             }
3036             return op;
3037         }
3038 
write(IceInternal.Buffer buf)3039         private int write(IceInternal.Buffer buf)
3040         {
3041             int start = buf.b.position();
3042             int op = _transceiver.write(buf);
3043             if(_instance.traceLevels().network >= 3 && buf.b.position() != start)
3044             {
3045                 StringBuilder s = new StringBuilder("sent ");
3046                 s.Append(buf.b.position() - start);
3047                 if(!_endpoint.datagram())
3048                 {
3049                     s.Append(" of ");
3050                     s.Append(buf.b.limit() - start);
3051                 }
3052                 s.Append(" bytes via ");
3053                 s.Append(_endpoint.protocol());
3054                 s.Append("\n");
3055                 s.Append(ToString());
3056                 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
3057             }
3058             return op;
3059         }
3060 
3061         private class OutgoingMessage
3062         {
OutgoingMessage(OutputStream stream, bool compress, bool adopt)3063             internal OutgoingMessage(OutputStream stream, bool compress, bool adopt)
3064             {
3065                 this.stream = stream;
3066                 this.compress = compress;
3067                 _adopt = adopt;
3068             }
3069 
OutgoingMessage(OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId)3070             internal OutgoingMessage(OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId)
3071             {
3072                 this.outAsync = outAsync;
3073                 this.stream = stream;
3074                 this.compress = compress;
3075                 this.requestId = requestId;
3076             }
3077 
canceled()3078             internal void canceled()
3079             {
3080                 Debug.Assert(outAsync != null); // Only requests can timeout.
3081                 outAsync = null;
3082             }
3083 
adopt()3084             internal void adopt()
3085             {
3086                 if(_adopt)
3087                 {
3088                     OutputStream stream = new OutputStream(this.stream.instance(), Util.currentProtocolEncoding);
3089                     stream.swap(this.stream);
3090                     this.stream = stream;
3091                     _adopt = false;
3092                 }
3093             }
3094 
sent()3095             internal bool sent()
3096             {
3097                 stream = null;
3098                 if(outAsync != null)
3099                 {
3100                     invokeSent = outAsync.sent();
3101                     return invokeSent ||receivedReply;
3102                 }
3103                 return false;
3104             }
3105 
completed(LocalException ex)3106             internal void completed(LocalException ex)
3107             {
3108                 if(outAsync != null)
3109                 {
3110                     if(outAsync.exception(ex))
3111                     {
3112                         outAsync.invokeException();
3113                     }
3114                 }
3115                 stream = null;
3116             }
3117 
3118             internal OutputStream stream;
3119             internal OutgoingAsyncBase outAsync;
3120             internal bool compress;
3121             internal int requestId;
3122             internal bool _adopt;
3123             internal bool prepared;
3124             internal bool isSent;
3125             internal bool invokeSent;
3126             internal bool receivedReply;
3127         }
3128 
3129         private Communicator _communicator;
3130         private Instance _instance;
3131         private ACMMonitor _monitor;
3132         private Transceiver _transceiver;
3133         private string _desc;
3134         private string _type;
3135         private Connector _connector;
3136         private EndpointI _endpoint;
3137 
3138         private ObjectAdapter _adapter;
3139         private ServantManager _servantManager;
3140 
3141         private Logger _logger;
3142         private TraceLevels _traceLevels;
3143         private IceInternal.ThreadPool _threadPool;
3144 
3145         private IceInternal.Timer _timer;
3146         private TimerTask _writeTimeout;
3147         private bool _writeTimeoutScheduled;
3148         private TimerTask _readTimeout;
3149         private bool _readTimeoutScheduled;
3150 
3151         private StartCallback _startCallback = null;
3152 
3153         private bool _warn;
3154         private bool _warnUdp;
3155 
3156         private long _acmLastActivity;
3157 
3158         private int _compressionLevel;
3159 
3160         private int _nextRequestId;
3161 
3162         private Dictionary<int, OutgoingAsyncBase> _asyncRequests = new Dictionary<int, OutgoingAsyncBase>();
3163 
3164         private LocalException _exception;
3165 
3166         private readonly int _messageSizeMax;
3167         private BatchRequestQueue _batchRequestQueue;
3168 
3169         private LinkedList<OutgoingMessage> _sendStreams = new LinkedList<OutgoingMessage>();
3170 
3171         private InputStream _readStream;
3172         private bool _readHeader;
3173         private OutputStream _writeStream;
3174 
3175         private ConnectionObserver _observer;
3176         private int _readStreamPos;
3177         private int _writeStreamPos;
3178 
3179         private int _dispatchCount;
3180 
3181         private int _state; // The current state.
3182         private bool _shutdownInitiated = false;
3183         private bool _initialized = false;
3184         private bool _validated = false;
3185 
3186         private Incoming _incomingCache;
3187         private object _incomingCacheMutex = new object();
3188 
3189         private static bool _compressionSupported;
3190 
3191         private bool _cacheBuffers;
3192 
3193         private ConnectionInfo _info;
3194 
3195         private CloseCallback _closeCallback;
3196         private HeartbeatCallback _heartbeatCallback;
3197 
3198         private static ConnectionState[] connectionStateMap = new ConnectionState[] {
3199             ConnectionState.ConnectionStateValidating,   // StateNotInitialized
3200             ConnectionState.ConnectionStateValidating,   // StateNotValidated
3201             ConnectionState.ConnectionStateActive,       // StateActive
3202             ConnectionState.ConnectionStateHolding,      // StateHolding
3203             ConnectionState.ConnectionStateClosing,      // StateClosing
3204             ConnectionState.ConnectionStateClosing,      // StateClosingPending
3205             ConnectionState.ConnectionStateClosed,       // StateClosed
3206             ConnectionState.ConnectionStateClosed,       // StateFinished
3207         };
3208     }
3209 }
3210