1 // 2 // Copyright (c) ZeroC, Inc. All rights reserved. 3 // 4 5 namespace IceInternal 6 { 7 using System.Collections.Generic; 8 using System.Diagnostics; 9 using System.Threading; 10 11 public class AsyncIOThread 12 { AsyncIOThread(Instance instance)13 internal AsyncIOThread(Instance instance) 14 { 15 _instance = instance; 16 17 _thread = new HelperThread(this); 18 updateObserver(); 19 _thread.Start(Util.stringToThreadPriority( 20 instance.initializationData().properties.getProperty("Ice.ThreadPriority"))); 21 } 22 23 public void updateObserver()24 updateObserver() 25 { 26 lock(this) 27 { 28 Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; 29 if(obsv != null) 30 { 31 _observer = obsv.getThreadObserver("Communicator", 32 _thread.getName(), 33 Ice.Instrumentation.ThreadState.ThreadStateIdle, 34 _observer); 35 if(_observer != null) 36 { 37 _observer.attach(); 38 } 39 } 40 } 41 } 42 queue(ThreadPoolWorkItem callback)43 public void queue(ThreadPoolWorkItem callback) 44 { 45 lock(this) 46 { 47 Debug.Assert(!_destroyed); 48 _queue.AddLast(callback); 49 Monitor.Pulse(this); 50 } 51 } 52 destroy()53 public void destroy() 54 { 55 lock(this) 56 { 57 Debug.Assert(!_destroyed); 58 _destroyed = true; 59 Monitor.Pulse(this); 60 } 61 } 62 joinWithThread()63 public void joinWithThread() 64 { 65 if(_thread != null) 66 { 67 _thread.Join(); 68 } 69 } 70 run()71 public void run() 72 { 73 LinkedList<ThreadPoolWorkItem> queue = new LinkedList<ThreadPoolWorkItem>(); 74 bool inUse = false; 75 while(true) 76 { 77 lock(this) 78 { 79 if(_observer != null && inUse) 80 { 81 _observer.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO, 82 Ice.Instrumentation.ThreadState.ThreadStateIdle); 83 inUse = false; 84 } 85 86 if(_destroyed && _queue.Count == 0) 87 { 88 break; 89 } 90 91 while(!_destroyed && _queue.Count == 0) 92 { 93 Monitor.Wait(this); 94 } 95 96 LinkedList<ThreadPoolWorkItem> tmp = queue; 97 queue = _queue; 98 _queue = tmp; 99 100 if(_observer != null) 101 { 102 _observer.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateIdle, 103 Ice.Instrumentation.ThreadState.ThreadStateInUseForIO); 104 inUse = true; 105 } 106 } 107 108 foreach(ThreadPoolWorkItem cb in queue) 109 { 110 try 111 { 112 cb(); 113 } 114 catch(Ice.LocalException ex) 115 { 116 string s = "exception in asynchronous IO thread:\n" + ex; 117 _instance.initializationData().logger.error(s); 118 } 119 catch(System.Exception ex) 120 { 121 string s = "unknown exception in asynchronous IO thread:\n" + ex; 122 _instance.initializationData().logger.error(s); 123 } 124 } 125 queue.Clear(); 126 } 127 128 if(_observer != null) 129 { 130 _observer.detach(); 131 } 132 } 133 134 private Instance _instance; 135 private bool _destroyed; 136 private LinkedList<ThreadPoolWorkItem> _queue = new LinkedList<ThreadPoolWorkItem>(); 137 private Ice.Instrumentation.ThreadObserver _observer; 138 139 private sealed class HelperThread 140 { HelperThread(AsyncIOThread asyncIOThread)141 internal HelperThread(AsyncIOThread asyncIOThread) 142 { 143 _asyncIOThread = asyncIOThread; 144 _name = _asyncIOThread._instance.initializationData().properties.getProperty("Ice.ProgramName"); 145 if(_name.Length > 0) 146 { 147 _name += "-"; 148 } 149 _name += "Ice.AsyncIOThread"; 150 } 151 Join()152 public void Join() 153 { 154 _thread.Join(); 155 } 156 getName()157 public string getName() 158 { 159 return _name; 160 } 161 Start(ThreadPriority priority)162 public void Start(ThreadPriority priority) 163 { 164 _thread = new Thread(new ThreadStart(Run)); 165 _thread.IsBackground = true; 166 _thread.Name = _name; 167 _thread.Start(); 168 } 169 Run()170 public void Run() 171 { 172 _asyncIOThread.run(); 173 } 174 175 private AsyncIOThread _asyncIOThread; 176 private string _name; 177 private Thread _thread; 178 } 179 180 private HelperThread _thread; 181 } 182 } 183