1 /*********************************************************************
2  *
3  * AUTHORIZATION TO USE AND DISTRIBUTE
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that:
7  *
8  * (1) source code distributions retain this paragraph in its entirety,
9  *
10  * (2) distributions including binary code include this paragraph in
11  *     its entirety in the documentation or other materials provided
12  *     with the distribution, and
13  *
14  * (3) all advertising materials mentioning features or use of this
15  *     software display the following acknowledgment:
16  *
17  *      "This product includes software written and developed
18  *       by Brian Adamson and Joe Macker of the Naval Research
19  *       Laboratory (NRL)."
20  *
21  *  The name of NRL, the name(s) of NRL  employee(s), or any entity
22  *  of the United States Government may not be used to endorse or
23  *  promote  products derived from this software, nor does the
24  *  inclusion of the NRL written and developed software  directly or
25  *  indirectly suggest NRL or United States  Government endorsement
26  *  of this product.
27  *
28  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND WITHOUT ANY EXPRESS OR
29  * IMPLIED WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED
30  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
31  ********************************************************************/
32 
33 #ifndef _PROTO_DISPATCHER
34 #define _PROTO_DISPATCHER
35 
36 #include "protoTimer.h"
37 #include "protoSocket.h"
38 #include "protoChannel.h"
39 
40 #ifdef WIN32
41 #ifndef _WIN32_WCE
42 #include <process.h>  // for _beginthreadex/_endthreadex
43 #endif // !_WIN32_WCE
44 #else
45 #include <pthread.h>
46 #include <unistd.h>  // for read()pwd
47 #endif // if/else WIN32/UNIX
48 
49 #ifdef LINUX
50 #ifdef USE_TIMERFD
51 #include <sys/timerfd.h>
52 #endif  // USE_TIMERFD
53 #endif // LINUX
54 
55 /**
56  * @class ProtoDispatcher
57  *
58  * @brief This class provides a core around which Unix and Win32 applications
59  * using Protolib can be implemented.
60  *
61  * It's "Run()" method provides a "main loop" which uses
62  * the "select()" system call on Unix and the
63  * similar "MsgWaitForMultipleObjectsEx()" system call on Win32.  It
64  * is planned to eventually provide some built-in support for threading
65  * in the future (e.g. the ProtoDispatcher::Run() method might execute
66  * in a thread, dispatching events to a parent thread).
67  */
68 
69 
70 
71  /***********************
72     ProtoDispatcher Update Notes
73 
74     ProtoDispatcher is going to be updated to use epoll() and kevent()
75     APIs instead of select() for supporting Linux and BSD operating systems.
76     Note select() usage will still be supported, but the code is going to
77     be streamlined significantly as part of this update.  Since there are
78     going to be added wait/dispatch paradigms put in place, the following
79     notes summarize these (i.e. "select()", "epoll()", "kevent()", and
80     "WaitForMultipleObjectsEx()"
81 
82 
83     select() - Each call to select() requires populated file descriptor sets
84                for "input", "output", and "exceptions".  These "fdsets" need
85                to set prior to each call to select().  ProtoDispatcher can keep
86                reference input, output, and exception "fdsets" that are modified
87                with calls to Update{Channel | Socket | Generic}Notification() and
88                then FD_COPY() with each call to select().
89                A threaded ProtoDispatcher can be "broken" from the "select()" based
90                wait state using its built-in "break pipe" (self-pipe set up for this)
91 
92                Summary of ProtoDispatcher state/ops needed:
93                   a) Keep list of active "streams"
94                   b) for time efficiency, keep reference fdsets for FD_COPY() to select()
95                   c) on dispatch, iterate through active "streams" and check FD_ISSET()
96 
97     epoll()  - The "epoll_ctl()" function is used to set/change/unset the list of
98                descriptors that epoll() monitors.  Then epoll_wait() is a blocking
99                call that is used to get notification of events.  For Linux, we
100                can use the timerfd, signalfd, and eventfd functions to set up
101                waitable descriptors for timing, signals, and notional "events".
102                We can use eventfd() instead of the "break pipe" on Linux.
103 
104                Summary of ProtoDispatcher state/ops needed:
105                   a) Keep list of active "streams"
106                   b) changes to stream status, handled directly with epoll_ctl, no extra state needed
107                   c) on dispatch, dereference associated "stream" using epoll_event.data.ptr
108 
109     kevent() - A "changelist" is passed to each call of kevent() to optionally
110                modify the set of things monitored by a given kqueue().  The same
111                "struct kevent" array can be used as the "changelist" and the
112                "eventlist" to pass in changes and get pending events with one
113                call.  Note that EVFILT_USER can be used to set up a sort
114                of user-defined "event" that a kevent() call in another thread
115                can "set" for the given kqueue descriptor. This can be used
116                instead of the "break pipe".
117 
118                Summary of ProtoDispatcher state/ops needed:
119                   a) Keep list of active "streams"
120                   b) keep array of "struct kevent" to accumulate stream status changes
121                   c) on dispatch, dereference associated "stream" using epoll_event.data.ptr
122 
123     WaitForMultipleObjectsEx() - An array of handles is passed in.  When one of
124                                  handles is "ready" or "set", the function returns
125                                  giving the value "index + WAIT_OBJECT0" for given
126                                  the array.  ProtoDispatcher can keep a dual arrays
127                                  that are set/modified/unset with calls to
128                                  Update{Channel | Socket | Generic}Notification()
129                                  where one array is the array of handles required
130                                  and a corresponding array has pointers to the
131                                  corresponding ProtoDispatcher::Stream object.
132                                  For objects that have both input and/or output
133                                  handles, separate "index" and "outdex" values are
134                                  kept.
135 
136 
137 
138     THE PLAN:
139         Step 1: Streamline current Unix/Win32 code that uses select()/WaitForMultipleObjectEx()
140                 so we can cleanly incorporate the alternative epoll() / kevent() implementation
141                 (e.g. make "thread break" consistent/modular, consolidate Channel/Socket/Generic stream
142                 into a single data structure that is randomly accessible and rapidly
143                 iterable (e.g. ProtoSortedTree).
144 
145         Step 2: Extend Linux version to formally use timerfd, eventfd, and signalfd APIs with select()?
146 
147         Step 3: Probably tackle "kevent()" implementation next?
148 
149         Step 4: Finally, do "epoll()" implementation
150 
151 
152  ************************/
153 
154 
155 
156 
157 /// Asynchronous event dispatcher class
158 class ProtoDispatcher : public ProtoTimerMgr,
159                         public ProtoSocket::Notifier,
160                         public ProtoChannel::Notifier
161 {
162     public:
163     // Construction
164         ProtoDispatcher();
165         virtual ~ProtoDispatcher();
166         void Destroy();
167 
168     // ProtoTimerMgr overrides
ActivateTimer(ProtoTimer & theTimer)169         void ActivateTimer(ProtoTimer& theTimer)
170         {
171             // TBD - should we SignalThread() the thread to wake it up?
172             SuspendThread();
173             ProtoTimerMgr::ActivateTimer(theTimer);
174             ResumeThread();
175         }
DeactivateTimer(ProtoTimer & theTimer)176         void DeactivateTimer(ProtoTimer& theTimer)
177         {
178             SuspendThread();
179             ProtoTimerMgr::DeactivateTimer(theTimer);
180             ResumeThread();
181         }
182 
183     // Methods to manage generic input/output streams (pipes, files, devices, etc)
184 #ifdef WIN32
185         typedef HANDLE Descriptor;  // WIN32 uses "HANDLE" type for descriptors
186 #else
187         typedef int Descriptor;     // UNIX uses "int" type for descriptors
188 #endif // if/selse WIN32
189         static const Descriptor INVALID_DESCRIPTOR;
190         enum Event {EVENT_INPUT, EVENT_OUTPUT};
191         typedef void (Callback)(ProtoDispatcher::Descriptor descriptor,
192                                 ProtoDispatcher::Event      theEvent,
193                                 const void*                 userData);
InstallGenericInput(ProtoDispatcher::Descriptor descriptor,ProtoDispatcher::Callback * callback,const void * clientData)194         bool InstallGenericInput(ProtoDispatcher::Descriptor descriptor,
195                                  ProtoDispatcher::Callback*  callback,
196                                  const void*                 clientData)
197         {
198             SignalThread();
199             bool result = InstallGenericStream(descriptor, callback, clientData, Stream::INPUT);
200             UnsignalThread();
201             return result;
202         }
RemoveGenericInput(Descriptor descriptor)203         void RemoveGenericInput(Descriptor descriptor)
204         {
205             SignalThread();
206             GenericStream* stream = FindGenericStream(descriptor);
207             if (stream)
208             {
209                 stream->UnsetFlag(Stream::INPUT);
210                 if (!stream->HasFlags()) RemoveGenericStream(stream);
211             }
212             UnsignalThread();
213         }
InstallGenericOutput(ProtoDispatcher::Descriptor descriptor,ProtoDispatcher::Callback * callback,const void * clientData)214         bool InstallGenericOutput(ProtoDispatcher::Descriptor descriptor,
215                                   ProtoDispatcher::Callback*  callback,
216                                   const void*                 clientData)
217         {
218             return InstallGenericStream(descriptor, callback, clientData, Stream::OUTPUT);
219         }
RemoveGenericOutput(Descriptor descriptor)220         void RemoveGenericOutput(Descriptor descriptor)
221         {
222             GenericStream* stream = FindGenericStream(descriptor);
223             if (stream)
224             {
225                 stream->UnsetFlag(Stream::OUTPUT);
226                 if (!stream->HasFlags()) RemoveGenericStream(stream);
227             }
228         }
229 
230         // Methods to set up and explicitly control ProtoDispatcher operation
231         /// Are there any pending timeouts/descriptors?
IsPending()232         bool IsPending()
233         {
234             return (IsThreaded() ||
235                     (NULL != generic_stream_list) ||
236                     (NULL != socket_stream_list) ||
237                     (NULL != channel_stream_list) ||
238                     (ProtoTimerMgr::GetTimeRemaining() >= 0.0));
239         }
240         /**
241         * This one can safely be called on a dispatcher _before_
242         * the "StartThread()" or "Run()" method is called without
243         * affecting the current process priority
244          */
SetPriorityBoost(bool state)245 		void SetPriorityBoost(bool state)
246             {priority_boost = state;}
247 
248         /**
249 		* This one will affect the current process priority if
250         * "StartThread()" has not already been called.
251         */
252 		bool BoostPriority();
253 
254         /**
255 		* Block until next event (timer, socket, other, occurs)
256         * (Note this will block forever if !IsPending())
257         */
258 		void Wait();
259         /// Dispatch round of events
260         void Dispatch();
261 
262         /// OR this "main loop" can be used.
263         int Run(bool oneShot = false);
264         void Stop(int exitCode = 0);
265 
IsRunning()266         bool IsRunning() const  // TBD - deprecate IsRunning() method?
267             {return run;}
268         /**
269          * Controls whether time of day is polled for ultra-precise timing
270          * If "precise timing" is set to "true", the time of day is polled
271          * to achieve precise timer timeouts.  However, this can consume
272          * considerable CPU resources.  The default state is "false".
273          */
SetPreciseTiming(bool state)274         void SetPreciseTiming(bool state) {precise_timing = state;}
275 
276 
277 #ifdef WIN32
278         typedef CRITICAL_SECTION    Mutex;
Init(Mutex & m)279         static void Init(Mutex& m) {InitializeCriticalSection(&m);}
Destroy(Mutex & m)280         static void Destroy(Mutex& m) {DeleteCriticalSection(&m);}
Lock(Mutex & m)281         static void Lock(Mutex& m) {EnterCriticalSection(&m);}
Unlock(Mutex & m)282         static void Unlock(Mutex& m) {LeaveCriticalSection(&m);}
283 #else
284         typedef pthread_mutex_t     Mutex;
Init(Mutex & m)285         static void Init(Mutex& m) {pthread_mutex_init(&m, NULL);}
Destroy(Mutex & m)286         static void Destroy(Mutex& m) {pthread_mutex_destroy(&m);}
Lock(Mutex & m)287         static void Lock(Mutex& m) {pthread_mutex_lock(&m);}
Unlock(Mutex & m)288         static void Unlock(Mutex& m) {pthread_mutex_unlock(&m);}
289 #endif // if/else WIN32/UNIX
290         /**
291          * @class Controller
292          *
293 		 * @brief Handles dispatching for the ProtoDispatcher
294 		 * The ProtoDispatcher::Controller helper class allows for
295          * thread-safe synchronization when using the "StartThread()"
296         * mode.
297          */
298         class Controller
299         {
300             public:
301                 virtual ~Controller();
302                 void OnDispatch();
303 
304             protected:
305                 Controller(ProtoDispatcher& theDispatcher);
306 
307             private:
308                 friend class ProtoDispatcher;
309                 bool DoDispatch();  /// only called by ProtoDispatcher
310                 virtual bool SignalDispatchReady() = 0;
OnThreadStop()311                 void OnThreadStop()
312                 {
313                     if (use_lock_a)
314                         ProtoDispatcher::Unlock(lock_a);
315                     else
316                         ProtoDispatcher::Unlock(lock_b);
317                 }
318 
319                 ProtoDispatcher&       dispatcher;
320                 Mutex                  lock_a;
321                 Mutex                  lock_b;
322                 bool                   use_lock_a;
323         };  // end class ProtoDispatcher::Controller()
324         friend class Controller;
325 
326         /// OR the dispatcher can be run in a thread n(w/ optional Controller)
327         bool StartThread(bool                         priorityBoost = false,
328                          ProtoDispatcher::Controller* theController = NULL);
IsThreaded()329         bool IsThreaded() {return ((ThreadId)(NULL) != thread_id);}
330 
331         /**
332 		* (NOTE: If dispatcher instance is threaded, you MUST successfully call
333         *        "SuspendThread()" before calling _any_ dispatcher methods or
334         *        manipulating any objects (ProtoTimers, ProtoSockets, etc)
335         *        which affect dispatcher operation!!!!
336         *        (Call "ResumeThread()" to allow the dispatcher to continue)
337         *        This can be used to create thread-safe protocols/applications
338         *        which use a ProtoDispatcher for async timing and I/O)
339         */
340 		bool SuspendThread();
341         void ResumeThread();
342 
343         // The "prompting" mechanism here is a way to cause a sleeping
344         // thread to wakeup and execute a bit of code as set by the
345         // "PromptCallback"
346         // (TBD) refine this prompting mechanism a little bit
347         // for example, some sort of waiting mechanism
348         // (e.g., the child could prompt its parent?  add "promptData" option to PromptThread()?
349         // should be added to know when the work has been done.
350         // For example, a waitable descriptor. This way a "parent"
351         // ProtoApp or ProtoDispatcher could create/start a pool of
352         // waiting worker threads, assign them jobs, and reap the results
353         // upon completion ...
354         typedef void (PromptCallback)(const void* clientData);
355         /**
356 		* Use this to set a PromptCallback function
357         * (should do this _before_ StartThread() is invoked
358         */
359 		void SetPromptCallback(PromptCallback* theCallback, const void* clientData = NULL)
360         {
361             prompt_callback = theCallback;
362             prompt_client_data = clientData;
363         }
364 
365         /// Call this to force call of  PromptCallback in thread's context
PromptThread()366         bool PromptThread()
367         {
368             if (SuspendThread())
369             {
370                 prompt_set = true;  // indicate prompt_callback should be called
371                 if (!SignalThread())
372                 {
373                     ResumeThread();
374                     return false; // suspend/signal thread
375                 }
376                 UnsignalThread();
377                 ResumeThread();
378                 return true;
379             }
380             else
381             {
382                 return false;
383             }
384         }
385 
386 
387 #ifdef WIN32
388         /// WIN32 apps can call this to create a msg window if desired
389         bool Win32Init();
390         void Win32Cleanup();
391 #endif // WIN32
392 
393     private:
394         bool SignalThread();
395         void UnsignalThread();
WasSignaled()396         bool WasSignaled()
397         {
398             // Check for and reset "break" signal
399             if (IsThreaded())
400             {
401 #ifdef WIN32
402                 if ((WAIT_OBJECT_0 <= wait_status) && (wait_status < (WAIT_OBJECT_0 + stream_count)))
403                 {
404                     GenericStream* theStream =
405                         static_cast<GenericStream*>(stream_ptrs_array[wait_status - WAIT_OBJECT_0]);
406                     if (break_event_stream == theStream)
407                     {
408                         ResetEvent(break_event);
409                         return true;
410                     }
411                 }
412 #else
413                 if ((wait_status > 0) && (FD_ISSET(break_pipe_fd[0], &input_set)))
414                 {
415                     // Reset by emptying pipe
416                     char byte[32];
417                     while (read(break_pipe_fd[0], byte, 32) > 0);
418                     return true;
419                 }
420 #endif // if/else WIN32/UNIX
421             }
422             return false;
423         }
424 
425 
426         /// Associated ProtoTimerMgrs will use this as needed
UpdateSystemTimer(ProtoTimer::Command,double)427         bool UpdateSystemTimer(ProtoTimer::Command /*command*/,
428                                double              /* delay*/)
429         {
430             // ProtoDispatcher::Dispatch() queries ProtoTimerMgr::GetTimeRemaining() instead
431             // This wakes up the dispatcher thread as needed.
432             SignalThread();
433             UnsignalThread();
434             return true;
435         }
436         /// Associated ProtoSockets will use this as needed
437         bool UpdateSocketNotification(ProtoSocket& theSocket,
438                                       int          notifyFlags);
439 
440         /// Associated ProtoChannels will use this as needed
441         bool UpdateChannelNotification(ProtoChannel& theChannel,
442                                        int           notifyFlags);
443         // Thread/Mutex stuff
444 #ifdef WIN32
445         typedef DWORD WaitStatus;
446         typedef DWORD ThreadId;
GetCurrentThread()447         ThreadId GetCurrentThread() {return ::GetCurrentThreadId();}
448 #ifdef _WIN32_WCE
449         typedef DWORD ExitStatus;
DoThreadExit(ExitStatus exitStatus)450         static void DoThreadExit(ExitStatus exitStatus) {ExitThread(exitStatus);}
451 #else
452         typedef unsigned int ExitStatus;
DoThreadExit(ExitStatus exitStatus)453         static void DoThreadExit(ExitStatus exitStatus) {_endthreadex(exitStatus);}
454 #endif // if/else _WIN32_WCE
GetExitStatus()455         ExitStatus GetExitStatus()
456 			{return exit_status;}
457 #else  // Unix
458         typedef int WaitStatus;
459         typedef pthread_t ThreadId;
GetCurrentThread()460         static ThreadId GetCurrentThread() {return pthread_self();}
461         typedef void* ExitStatus;
GetExitStatus()462 		ExitStatus GetExitStatus()  // note pthread uses a _pointer_ to the status value location
463 			{return &exit_status;}
DoThreadExit(ExitStatus exitStatus)464         static void DoThreadExit(ExitStatus exitStatus) {pthread_exit(exitStatus);}
465 #endif // if/else WIN32/UNIX
466 
IsMyself()467         bool IsMyself() {return (GetCurrentThread() == thread_id);}
468         void DestroyThread();
469         //static void DoThreadBreak(ProtoDispatcher::Descriptor descriptor,
470         //                          ProtoDispatcher::Event      theEvent,
471         //                          const void*                 userData);
472         bool InstallBreak();
473         void RemoveBreak();
474 
475 
476         /**
477          * @class Stream
478          *
479          * @brief This class helps manage notification for
480          * protoSockets and generic I/O descriptors
481          */
482         class Stream
483         {
484             public:
485                 enum Type {GENERIC, SOCKET, CHANNEL};
486                 enum Flag {NONE = 0x00, INPUT = 0x01, OUTPUT = 0x02, EXCEPTION = 0x04};
487 
GetType()488                 Type GetType() const {return type;}
IsInput()489                 bool IsInput() const {return FlagIsSet(INPUT);}
IsOutput()490                 bool IsOutput() const {return FlagIsSet(OUTPUT);}
FlagIsSet(Flag theFlag)491                 bool FlagIsSet(Flag theFlag) const {return (0 != (flags & theFlag));}
SetFlag(Flag theFlag)492                 void SetFlag(Flag theFlag) {flags |= theFlag;}
UnsetFlag(Flag theFlag)493                 void UnsetFlag(Flag theFlag) {flags &= ~theFlag;}
SetFlags(int theFlags)494                 void SetFlags(int theFlags) {flags = theFlags;}
HasFlags()495                 bool HasFlags() {return (0 != flags);}
ClearFlags()496                 void ClearFlags() {flags = 0;}
497 
GetNext()498                 Stream* GetNext() const {return next;}
SetNext(Stream * stream)499                 void SetNext(Stream* stream) {next = stream;}
GetPrev()500                 Stream* GetPrev() const {return prev;}
SetPrev(Stream * stream)501                 void SetPrev(Stream* stream) {prev = stream;}
502 #ifdef WIN32
GetIndex()503                 int GetIndex() const {return index;}
SetIndex(int theIndex)504                 void SetIndex(int theIndex) {index = theIndex;}
GetOutdex()505                 int GetOutdex() const {return outdex;}
SetOutdex(int theOutdex)506                 void SetOutdex(int theOutdex) {outdex = theOutdex;}
507 #endif // WIN32
508             protected:
509                 Stream(Type theType);
510 
511             private:
512                 Type  type;
513                 int   flags;
514 #ifdef WIN32
515                 int   index;
516                 int   outdex;
517 #endif // WIN32
518                 Stream* prev;
519                 Stream* next;
520         };  // end class Stream
521 
522         /**
523          * @class SocketStream
524          *
525          * @brief This class helps manage notification for
526          * protoSockets and generic I/O descriptors
527          */
528 
529         class SocketStream : public Stream
530         {
531             public:
532                 SocketStream(ProtoSocket& theSocket);
GetSocket()533                 ProtoSocket& GetSocket() {return *socket;}
SetSocket(ProtoSocket & theSocket)534                 void SetSocket(ProtoSocket& theSocket) {socket = &theSocket;}
535             private:
536                 ProtoSocket*    socket;
537         };  // end class SocketStream
538         SocketStream* GetSocketStream(ProtoSocket& theSocket);
539         void ReleaseSocketStream(SocketStream* socketStream);
540 
541         /**
542          * @class ChannelStream
543          *
544          * @brief This class helps manage notification for
545          * protoSockets and generic I/O descriptors
546          */
547 
548         class ChannelStream : public Stream
549         {
550             public:
551                 ChannelStream(ProtoChannel& theChannel);
GetChannel()552                 ProtoChannel& GetChannel() {return *channel;}
SetChannel(ProtoChannel & theChannel)553                 void SetChannel(ProtoChannel& theChannel) {channel = &theChannel;}
554             private:
555                 ProtoChannel*    channel;
556         };  // end class ChannelStream
557         ChannelStream* GetChannelStream(ProtoChannel& theChannel);
558         void ReleaseChannelStream(ChannelStream* channelStream);
559 
560         /**
561          * @class GenericStream
562          *
563          * @brief This class helps manage notification for
564          * protoSockets and generic I/O descriptors
565          */
566 
567         class GenericStream : public Stream
568         {
569             public:
570                 GenericStream(Descriptor theDescriptor);
GetDescriptor()571                 Descriptor GetDescriptor() {return descriptor;}
SetDescriptor(Descriptor theDescriptor)572                 void SetDescriptor(Descriptor theDescriptor) {descriptor = theDescriptor;}
573                 void SetCallback(Callback* theCallback, const void* clientData = NULL)
574                 {
575                     callback = theCallback;
576                     client_data = clientData;
577                 }
OnEvent(Event theEvent)578                 void OnEvent(Event theEvent)
579                     {if (callback) callback(descriptor, theEvent, client_data);}
580             private:
581                 Descriptor  descriptor;
582                 Callback*   callback;
583                 const void* client_data;
584         };  // end class GenericStream
585 
586         bool InstallGenericStream(ProtoDispatcher::Descriptor   descriptor,
587                                   Callback*                     callback,
588                                   const void*                   userData,
589                                   Stream::Flag                  flag);
RemoveGenericStream(GenericStream * stream)590         void RemoveGenericStream(GenericStream* stream)
591             {ReleaseGenericStream(stream);}
592         GenericStream* GetGenericStream(Descriptor descriptor);
593         GenericStream* FindGenericStream(Descriptor descriptor) const;
594         void ReleaseGenericStream(GenericStream* stream);
595 
596 
597     // Members
598         SocketStream*            socket_stream_pool;
599         SocketStream*            socket_stream_list;
600         ChannelStream*           channel_stream_pool;
601         ChannelStream*           channel_stream_list;
602         GenericStream*           generic_stream_pool;
603         GenericStream*           generic_stream_list;
604 
605         volatile bool            run;
606         WaitStatus               wait_status;
607         int                      exit_code;
608         double                   timer_delay;  // ( timer_delay < 0.0) means INFINITY
609         bool                     precise_timing;
610         ThreadId                 thread_id;
611         bool                     priority_boost;
612         volatile bool            thread_started;
613         Mutex                    suspend_mutex;
614         Mutex                    signal_mutex;
615         ThreadId                 thread_master;
616         unsigned int             suspend_count;
617         unsigned int             signal_count;
618         Controller*              controller;
619 
620         bool                     prompt_set;
621         PromptCallback*          prompt_callback;
622         const void*              prompt_client_data;
623 #ifdef USE_TIMERFD
624         int                      timer_fd;
625 #endif // USE_TIMERFD
626 
627 #ifdef WIN32
628         int Win32AddStream(Stream& theStream, HANDLE theHandle);
629         void Win32RemoveStream(int index);
630         bool Win32IncreaseStreamArraySize();
631         static LRESULT CALLBACK MessageHandler(HWND hwnd, UINT message, WPARAM wParam, LPARAM lParam);
632 #ifdef _WIN32_WCE
633         static DWORD WINAPI DoThreadStart(LPVOID lpParameter);
634 #else
635         static unsigned int  __stdcall DoThreadStart(void* lpParameter);
636 #endif // if/else _WIN32_WCE
637 		ExitStatus				 exit_status;
638         enum {DEFAULT_ITEM_ARRAY_SIZE = 32};
639         HANDLE                   stream_handles_default[DEFAULT_ITEM_ARRAY_SIZE];
640         Stream*                  stream_ptrs_default[DEFAULT_ITEM_ARRAY_SIZE];
641         DWORD                    stream_array_size;
642         HANDLE*                  stream_handles_array;
643         Stream**                 stream_ptrs_array;
644         DWORD                    stream_count;
645         HWND                     msg_window;
646         HANDLE                   break_event;
647         GenericStream*           break_event_stream;
648         bool                     socket_io_pending;
649         HANDLE                   actual_thread_handle;
650 #else  // UNIX
651         static void* DoThreadStart(void* arg);
652         int                      exit_status;
653         fd_set                   input_set;
654         fd_set                   output_set;
655         int                      break_pipe_fd[2];
656 #endif // if/else WIN32/UNIX
657 
658 };  // end class ProtoDispatcher
659 
660 #endif // _PROTO_DISPATCHER
661