1 /********************************************************************* 2 * 3 * AUTHORIZATION TO USE AND DISTRIBUTE 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that: 7 * 8 * (1) source code distributions retain this paragraph in its entirety, 9 * 10 * (2) distributions including binary code include this paragraph in 11 * its entirety in the documentation or other materials provided 12 * with the distribution, and 13 * 14 * (3) all advertising materials mentioning features or use of this 15 * software display the following acknowledgment: 16 * 17 * "This product includes software written and developed 18 * by Brian Adamson and Joe Macker of the Naval Research 19 * Laboratory (NRL)." 20 * 21 * The name of NRL, the name(s) of NRL employee(s), or any entity 22 * of the United States Government may not be used to endorse or 23 * promote products derived from this software, nor does the 24 * inclusion of the NRL written and developed software directly or 25 * indirectly suggest NRL or United States Government endorsement 26 * of this product. 27 * 28 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND WITHOUT ANY EXPRESS OR 29 * IMPLIED WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED 30 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. 31 ********************************************************************/ 32 33 #ifndef _PROTO_DISPATCHER 34 #define _PROTO_DISPATCHER 35 36 #include "protoTimer.h" 37 #include "protoSocket.h" 38 #include "protoChannel.h" 39 40 #ifdef WIN32 41 #ifndef _WIN32_WCE 42 #include <process.h> // for _beginthreadex/_endthreadex 43 #endif // !_WIN32_WCE 44 #else 45 #include <pthread.h> 46 #include <unistd.h> // for read()pwd 47 #endif // if/else WIN32/UNIX 48 49 #ifdef LINUX 50 #ifdef USE_TIMERFD 51 #include <sys/timerfd.h> 52 #endif // USE_TIMERFD 53 #endif // LINUX 54 55 /** 56 * @class ProtoDispatcher 57 * 58 * @brief This class provides a core around which Unix and Win32 applications 59 * using Protolib can be implemented. 60 * 61 * It's "Run()" method provides a "main loop" which uses 62 * the "select()" system call on Unix and the 63 * similar "MsgWaitForMultipleObjectsEx()" system call on Win32. It 64 * is planned to eventually provide some built-in support for threading 65 * in the future (e.g. the ProtoDispatcher::Run() method might execute 66 * in a thread, dispatching events to a parent thread). 67 */ 68 69 70 71 /*********************** 72 ProtoDispatcher Update Notes 73 74 ProtoDispatcher is going to be updated to use epoll() and kevent() 75 APIs instead of select() for supporting Linux and BSD operating systems. 76 Note select() usage will still be supported, but the code is going to 77 be streamlined significantly as part of this update. Since there are 78 going to be added wait/dispatch paradigms put in place, the following 79 notes summarize these (i.e. "select()", "epoll()", "kevent()", and 80 "WaitForMultipleObjectsEx()" 81 82 83 select() - Each call to select() requires populated file descriptor sets 84 for "input", "output", and "exceptions". These "fdsets" need 85 to set prior to each call to select(). ProtoDispatcher can keep 86 reference input, output, and exception "fdsets" that are modified 87 with calls to Update{Channel | Socket | Generic}Notification() and 88 then FD_COPY() with each call to select(). 89 A threaded ProtoDispatcher can be "broken" from the "select()" based 90 wait state using its built-in "break pipe" (self-pipe set up for this) 91 92 Summary of ProtoDispatcher state/ops needed: 93 a) Keep list of active "streams" 94 b) for time efficiency, keep reference fdsets for FD_COPY() to select() 95 c) on dispatch, iterate through active "streams" and check FD_ISSET() 96 97 epoll() - The "epoll_ctl()" function is used to set/change/unset the list of 98 descriptors that epoll() monitors. Then epoll_wait() is a blocking 99 call that is used to get notification of events. For Linux, we 100 can use the timerfd, signalfd, and eventfd functions to set up 101 waitable descriptors for timing, signals, and notional "events". 102 We can use eventfd() instead of the "break pipe" on Linux. 103 104 Summary of ProtoDispatcher state/ops needed: 105 a) Keep list of active "streams" 106 b) changes to stream status, handled directly with epoll_ctl, no extra state needed 107 c) on dispatch, dereference associated "stream" using epoll_event.data.ptr 108 109 kevent() - A "changelist" is passed to each call of kevent() to optionally 110 modify the set of things monitored by a given kqueue(). The same 111 "struct kevent" array can be used as the "changelist" and the 112 "eventlist" to pass in changes and get pending events with one 113 call. Note that EVFILT_USER can be used to set up a sort 114 of user-defined "event" that a kevent() call in another thread 115 can "set" for the given kqueue descriptor. This can be used 116 instead of the "break pipe". 117 118 Summary of ProtoDispatcher state/ops needed: 119 a) Keep list of active "streams" 120 b) keep array of "struct kevent" to accumulate stream status changes 121 c) on dispatch, dereference associated "stream" using epoll_event.data.ptr 122 123 WaitForMultipleObjectsEx() - An array of handles is passed in. When one of 124 handles is "ready" or "set", the function returns 125 giving the value "index + WAIT_OBJECT0" for given 126 the array. ProtoDispatcher can keep a dual arrays 127 that are set/modified/unset with calls to 128 Update{Channel | Socket | Generic}Notification() 129 where one array is the array of handles required 130 and a corresponding array has pointers to the 131 corresponding ProtoDispatcher::Stream object. 132 For objects that have both input and/or output 133 handles, separate "index" and "outdex" values are 134 kept. 135 136 137 138 THE PLAN: 139 Step 1: Streamline current Unix/Win32 code that uses select()/WaitForMultipleObjectEx() 140 so we can cleanly incorporate the alternative epoll() / kevent() implementation 141 (e.g. make "thread break" consistent/modular, consolidate Channel/Socket/Generic stream 142 into a single data structure that is randomly accessible and rapidly 143 iterable (e.g. ProtoSortedTree). 144 145 Step 2: Extend Linux version to formally use timerfd, eventfd, and signalfd APIs with select()? 146 147 Step 3: Probably tackle "kevent()" implementation next? 148 149 Step 4: Finally, do "epoll()" implementation 150 151 152 ************************/ 153 154 155 156 157 /// Asynchronous event dispatcher class 158 class ProtoDispatcher : public ProtoTimerMgr, 159 public ProtoSocket::Notifier, 160 public ProtoChannel::Notifier 161 { 162 public: 163 // Construction 164 ProtoDispatcher(); 165 virtual ~ProtoDispatcher(); 166 void Destroy(); 167 168 // ProtoTimerMgr overrides ActivateTimer(ProtoTimer & theTimer)169 void ActivateTimer(ProtoTimer& theTimer) 170 { 171 // TBD - should we SignalThread() the thread to wake it up? 172 SuspendThread(); 173 ProtoTimerMgr::ActivateTimer(theTimer); 174 ResumeThread(); 175 } DeactivateTimer(ProtoTimer & theTimer)176 void DeactivateTimer(ProtoTimer& theTimer) 177 { 178 SuspendThread(); 179 ProtoTimerMgr::DeactivateTimer(theTimer); 180 ResumeThread(); 181 } 182 183 // Methods to manage generic input/output streams (pipes, files, devices, etc) 184 #ifdef WIN32 185 typedef HANDLE Descriptor; // WIN32 uses "HANDLE" type for descriptors 186 #else 187 typedef int Descriptor; // UNIX uses "int" type for descriptors 188 #endif // if/selse WIN32 189 static const Descriptor INVALID_DESCRIPTOR; 190 enum Event {EVENT_INPUT, EVENT_OUTPUT}; 191 typedef void (Callback)(ProtoDispatcher::Descriptor descriptor, 192 ProtoDispatcher::Event theEvent, 193 const void* userData); InstallGenericInput(ProtoDispatcher::Descriptor descriptor,ProtoDispatcher::Callback * callback,const void * clientData)194 bool InstallGenericInput(ProtoDispatcher::Descriptor descriptor, 195 ProtoDispatcher::Callback* callback, 196 const void* clientData) 197 { 198 SignalThread(); 199 bool result = InstallGenericStream(descriptor, callback, clientData, Stream::INPUT); 200 UnsignalThread(); 201 return result; 202 } RemoveGenericInput(Descriptor descriptor)203 void RemoveGenericInput(Descriptor descriptor) 204 { 205 SignalThread(); 206 GenericStream* stream = FindGenericStream(descriptor); 207 if (stream) 208 { 209 stream->UnsetFlag(Stream::INPUT); 210 if (!stream->HasFlags()) RemoveGenericStream(stream); 211 } 212 UnsignalThread(); 213 } InstallGenericOutput(ProtoDispatcher::Descriptor descriptor,ProtoDispatcher::Callback * callback,const void * clientData)214 bool InstallGenericOutput(ProtoDispatcher::Descriptor descriptor, 215 ProtoDispatcher::Callback* callback, 216 const void* clientData) 217 { 218 return InstallGenericStream(descriptor, callback, clientData, Stream::OUTPUT); 219 } RemoveGenericOutput(Descriptor descriptor)220 void RemoveGenericOutput(Descriptor descriptor) 221 { 222 GenericStream* stream = FindGenericStream(descriptor); 223 if (stream) 224 { 225 stream->UnsetFlag(Stream::OUTPUT); 226 if (!stream->HasFlags()) RemoveGenericStream(stream); 227 } 228 } 229 230 // Methods to set up and explicitly control ProtoDispatcher operation 231 /// Are there any pending timeouts/descriptors? IsPending()232 bool IsPending() 233 { 234 return (IsThreaded() || 235 (NULL != generic_stream_list) || 236 (NULL != socket_stream_list) || 237 (NULL != channel_stream_list) || 238 (ProtoTimerMgr::GetTimeRemaining() >= 0.0)); 239 } 240 /** 241 * This one can safely be called on a dispatcher _before_ 242 * the "StartThread()" or "Run()" method is called without 243 * affecting the current process priority 244 */ SetPriorityBoost(bool state)245 void SetPriorityBoost(bool state) 246 {priority_boost = state;} 247 248 /** 249 * This one will affect the current process priority if 250 * "StartThread()" has not already been called. 251 */ 252 bool BoostPriority(); 253 254 /** 255 * Block until next event (timer, socket, other, occurs) 256 * (Note this will block forever if !IsPending()) 257 */ 258 void Wait(); 259 /// Dispatch round of events 260 void Dispatch(); 261 262 /// OR this "main loop" can be used. 263 int Run(bool oneShot = false); 264 void Stop(int exitCode = 0); 265 IsRunning()266 bool IsRunning() const // TBD - deprecate IsRunning() method? 267 {return run;} 268 /** 269 * Controls whether time of day is polled for ultra-precise timing 270 * If "precise timing" is set to "true", the time of day is polled 271 * to achieve precise timer timeouts. However, this can consume 272 * considerable CPU resources. The default state is "false". 273 */ SetPreciseTiming(bool state)274 void SetPreciseTiming(bool state) {precise_timing = state;} 275 276 277 #ifdef WIN32 278 typedef CRITICAL_SECTION Mutex; Init(Mutex & m)279 static void Init(Mutex& m) {InitializeCriticalSection(&m);} Destroy(Mutex & m)280 static void Destroy(Mutex& m) {DeleteCriticalSection(&m);} Lock(Mutex & m)281 static void Lock(Mutex& m) {EnterCriticalSection(&m);} Unlock(Mutex & m)282 static void Unlock(Mutex& m) {LeaveCriticalSection(&m);} 283 #else 284 typedef pthread_mutex_t Mutex; Init(Mutex & m)285 static void Init(Mutex& m) {pthread_mutex_init(&m, NULL);} Destroy(Mutex & m)286 static void Destroy(Mutex& m) {pthread_mutex_destroy(&m);} Lock(Mutex & m)287 static void Lock(Mutex& m) {pthread_mutex_lock(&m);} Unlock(Mutex & m)288 static void Unlock(Mutex& m) {pthread_mutex_unlock(&m);} 289 #endif // if/else WIN32/UNIX 290 /** 291 * @class Controller 292 * 293 * @brief Handles dispatching for the ProtoDispatcher 294 * The ProtoDispatcher::Controller helper class allows for 295 * thread-safe synchronization when using the "StartThread()" 296 * mode. 297 */ 298 class Controller 299 { 300 public: 301 virtual ~Controller(); 302 void OnDispatch(); 303 304 protected: 305 Controller(ProtoDispatcher& theDispatcher); 306 307 private: 308 friend class ProtoDispatcher; 309 bool DoDispatch(); /// only called by ProtoDispatcher 310 virtual bool SignalDispatchReady() = 0; OnThreadStop()311 void OnThreadStop() 312 { 313 if (use_lock_a) 314 ProtoDispatcher::Unlock(lock_a); 315 else 316 ProtoDispatcher::Unlock(lock_b); 317 } 318 319 ProtoDispatcher& dispatcher; 320 Mutex lock_a; 321 Mutex lock_b; 322 bool use_lock_a; 323 }; // end class ProtoDispatcher::Controller() 324 friend class Controller; 325 326 /// OR the dispatcher can be run in a thread n(w/ optional Controller) 327 bool StartThread(bool priorityBoost = false, 328 ProtoDispatcher::Controller* theController = NULL); IsThreaded()329 bool IsThreaded() {return ((ThreadId)(NULL) != thread_id);} 330 331 /** 332 * (NOTE: If dispatcher instance is threaded, you MUST successfully call 333 * "SuspendThread()" before calling _any_ dispatcher methods or 334 * manipulating any objects (ProtoTimers, ProtoSockets, etc) 335 * which affect dispatcher operation!!!! 336 * (Call "ResumeThread()" to allow the dispatcher to continue) 337 * This can be used to create thread-safe protocols/applications 338 * which use a ProtoDispatcher for async timing and I/O) 339 */ 340 bool SuspendThread(); 341 void ResumeThread(); 342 343 // The "prompting" mechanism here is a way to cause a sleeping 344 // thread to wakeup and execute a bit of code as set by the 345 // "PromptCallback" 346 // (TBD) refine this prompting mechanism a little bit 347 // for example, some sort of waiting mechanism 348 // (e.g., the child could prompt its parent? add "promptData" option to PromptThread()? 349 // should be added to know when the work has been done. 350 // For example, a waitable descriptor. This way a "parent" 351 // ProtoApp or ProtoDispatcher could create/start a pool of 352 // waiting worker threads, assign them jobs, and reap the results 353 // upon completion ... 354 typedef void (PromptCallback)(const void* clientData); 355 /** 356 * Use this to set a PromptCallback function 357 * (should do this _before_ StartThread() is invoked 358 */ 359 void SetPromptCallback(PromptCallback* theCallback, const void* clientData = NULL) 360 { 361 prompt_callback = theCallback; 362 prompt_client_data = clientData; 363 } 364 365 /// Call this to force call of PromptCallback in thread's context PromptThread()366 bool PromptThread() 367 { 368 if (SuspendThread()) 369 { 370 prompt_set = true; // indicate prompt_callback should be called 371 if (!SignalThread()) 372 { 373 ResumeThread(); 374 return false; // suspend/signal thread 375 } 376 UnsignalThread(); 377 ResumeThread(); 378 return true; 379 } 380 else 381 { 382 return false; 383 } 384 } 385 386 387 #ifdef WIN32 388 /// WIN32 apps can call this to create a msg window if desired 389 bool Win32Init(); 390 void Win32Cleanup(); 391 #endif // WIN32 392 393 private: 394 bool SignalThread(); 395 void UnsignalThread(); WasSignaled()396 bool WasSignaled() 397 { 398 // Check for and reset "break" signal 399 if (IsThreaded()) 400 { 401 #ifdef WIN32 402 if ((WAIT_OBJECT_0 <= wait_status) && (wait_status < (WAIT_OBJECT_0 + stream_count))) 403 { 404 GenericStream* theStream = 405 static_cast<GenericStream*>(stream_ptrs_array[wait_status - WAIT_OBJECT_0]); 406 if (break_event_stream == theStream) 407 { 408 ResetEvent(break_event); 409 return true; 410 } 411 } 412 #else 413 if ((wait_status > 0) && (FD_ISSET(break_pipe_fd[0], &input_set))) 414 { 415 // Reset by emptying pipe 416 char byte[32]; 417 while (read(break_pipe_fd[0], byte, 32) > 0); 418 return true; 419 } 420 #endif // if/else WIN32/UNIX 421 } 422 return false; 423 } 424 425 426 /// Associated ProtoTimerMgrs will use this as needed UpdateSystemTimer(ProtoTimer::Command,double)427 bool UpdateSystemTimer(ProtoTimer::Command /*command*/, 428 double /* delay*/) 429 { 430 // ProtoDispatcher::Dispatch() queries ProtoTimerMgr::GetTimeRemaining() instead 431 // This wakes up the dispatcher thread as needed. 432 SignalThread(); 433 UnsignalThread(); 434 return true; 435 } 436 /// Associated ProtoSockets will use this as needed 437 bool UpdateSocketNotification(ProtoSocket& theSocket, 438 int notifyFlags); 439 440 /// Associated ProtoChannels will use this as needed 441 bool UpdateChannelNotification(ProtoChannel& theChannel, 442 int notifyFlags); 443 // Thread/Mutex stuff 444 #ifdef WIN32 445 typedef DWORD WaitStatus; 446 typedef DWORD ThreadId; GetCurrentThread()447 ThreadId GetCurrentThread() {return ::GetCurrentThreadId();} 448 #ifdef _WIN32_WCE 449 typedef DWORD ExitStatus; DoThreadExit(ExitStatus exitStatus)450 static void DoThreadExit(ExitStatus exitStatus) {ExitThread(exitStatus);} 451 #else 452 typedef unsigned int ExitStatus; DoThreadExit(ExitStatus exitStatus)453 static void DoThreadExit(ExitStatus exitStatus) {_endthreadex(exitStatus);} 454 #endif // if/else _WIN32_WCE GetExitStatus()455 ExitStatus GetExitStatus() 456 {return exit_status;} 457 #else // Unix 458 typedef int WaitStatus; 459 typedef pthread_t ThreadId; GetCurrentThread()460 static ThreadId GetCurrentThread() {return pthread_self();} 461 typedef void* ExitStatus; GetExitStatus()462 ExitStatus GetExitStatus() // note pthread uses a _pointer_ to the status value location 463 {return &exit_status;} DoThreadExit(ExitStatus exitStatus)464 static void DoThreadExit(ExitStatus exitStatus) {pthread_exit(exitStatus);} 465 #endif // if/else WIN32/UNIX 466 IsMyself()467 bool IsMyself() {return (GetCurrentThread() == thread_id);} 468 void DestroyThread(); 469 //static void DoThreadBreak(ProtoDispatcher::Descriptor descriptor, 470 // ProtoDispatcher::Event theEvent, 471 // const void* userData); 472 bool InstallBreak(); 473 void RemoveBreak(); 474 475 476 /** 477 * @class Stream 478 * 479 * @brief This class helps manage notification for 480 * protoSockets and generic I/O descriptors 481 */ 482 class Stream 483 { 484 public: 485 enum Type {GENERIC, SOCKET, CHANNEL}; 486 enum Flag {NONE = 0x00, INPUT = 0x01, OUTPUT = 0x02, EXCEPTION = 0x04}; 487 GetType()488 Type GetType() const {return type;} IsInput()489 bool IsInput() const {return FlagIsSet(INPUT);} IsOutput()490 bool IsOutput() const {return FlagIsSet(OUTPUT);} FlagIsSet(Flag theFlag)491 bool FlagIsSet(Flag theFlag) const {return (0 != (flags & theFlag));} SetFlag(Flag theFlag)492 void SetFlag(Flag theFlag) {flags |= theFlag;} UnsetFlag(Flag theFlag)493 void UnsetFlag(Flag theFlag) {flags &= ~theFlag;} SetFlags(int theFlags)494 void SetFlags(int theFlags) {flags = theFlags;} HasFlags()495 bool HasFlags() {return (0 != flags);} ClearFlags()496 void ClearFlags() {flags = 0;} 497 GetNext()498 Stream* GetNext() const {return next;} SetNext(Stream * stream)499 void SetNext(Stream* stream) {next = stream;} GetPrev()500 Stream* GetPrev() const {return prev;} SetPrev(Stream * stream)501 void SetPrev(Stream* stream) {prev = stream;} 502 #ifdef WIN32 GetIndex()503 int GetIndex() const {return index;} SetIndex(int theIndex)504 void SetIndex(int theIndex) {index = theIndex;} GetOutdex()505 int GetOutdex() const {return outdex;} SetOutdex(int theOutdex)506 void SetOutdex(int theOutdex) {outdex = theOutdex;} 507 #endif // WIN32 508 protected: 509 Stream(Type theType); 510 511 private: 512 Type type; 513 int flags; 514 #ifdef WIN32 515 int index; 516 int outdex; 517 #endif // WIN32 518 Stream* prev; 519 Stream* next; 520 }; // end class Stream 521 522 /** 523 * @class SocketStream 524 * 525 * @brief This class helps manage notification for 526 * protoSockets and generic I/O descriptors 527 */ 528 529 class SocketStream : public Stream 530 { 531 public: 532 SocketStream(ProtoSocket& theSocket); GetSocket()533 ProtoSocket& GetSocket() {return *socket;} SetSocket(ProtoSocket & theSocket)534 void SetSocket(ProtoSocket& theSocket) {socket = &theSocket;} 535 private: 536 ProtoSocket* socket; 537 }; // end class SocketStream 538 SocketStream* GetSocketStream(ProtoSocket& theSocket); 539 void ReleaseSocketStream(SocketStream* socketStream); 540 541 /** 542 * @class ChannelStream 543 * 544 * @brief This class helps manage notification for 545 * protoSockets and generic I/O descriptors 546 */ 547 548 class ChannelStream : public Stream 549 { 550 public: 551 ChannelStream(ProtoChannel& theChannel); GetChannel()552 ProtoChannel& GetChannel() {return *channel;} SetChannel(ProtoChannel & theChannel)553 void SetChannel(ProtoChannel& theChannel) {channel = &theChannel;} 554 private: 555 ProtoChannel* channel; 556 }; // end class ChannelStream 557 ChannelStream* GetChannelStream(ProtoChannel& theChannel); 558 void ReleaseChannelStream(ChannelStream* channelStream); 559 560 /** 561 * @class GenericStream 562 * 563 * @brief This class helps manage notification for 564 * protoSockets and generic I/O descriptors 565 */ 566 567 class GenericStream : public Stream 568 { 569 public: 570 GenericStream(Descriptor theDescriptor); GetDescriptor()571 Descriptor GetDescriptor() {return descriptor;} SetDescriptor(Descriptor theDescriptor)572 void SetDescriptor(Descriptor theDescriptor) {descriptor = theDescriptor;} 573 void SetCallback(Callback* theCallback, const void* clientData = NULL) 574 { 575 callback = theCallback; 576 client_data = clientData; 577 } OnEvent(Event theEvent)578 void OnEvent(Event theEvent) 579 {if (callback) callback(descriptor, theEvent, client_data);} 580 private: 581 Descriptor descriptor; 582 Callback* callback; 583 const void* client_data; 584 }; // end class GenericStream 585 586 bool InstallGenericStream(ProtoDispatcher::Descriptor descriptor, 587 Callback* callback, 588 const void* userData, 589 Stream::Flag flag); RemoveGenericStream(GenericStream * stream)590 void RemoveGenericStream(GenericStream* stream) 591 {ReleaseGenericStream(stream);} 592 GenericStream* GetGenericStream(Descriptor descriptor); 593 GenericStream* FindGenericStream(Descriptor descriptor) const; 594 void ReleaseGenericStream(GenericStream* stream); 595 596 597 // Members 598 SocketStream* socket_stream_pool; 599 SocketStream* socket_stream_list; 600 ChannelStream* channel_stream_pool; 601 ChannelStream* channel_stream_list; 602 GenericStream* generic_stream_pool; 603 GenericStream* generic_stream_list; 604 605 volatile bool run; 606 WaitStatus wait_status; 607 int exit_code; 608 double timer_delay; // ( timer_delay < 0.0) means INFINITY 609 bool precise_timing; 610 ThreadId thread_id; 611 bool priority_boost; 612 volatile bool thread_started; 613 Mutex suspend_mutex; 614 Mutex signal_mutex; 615 ThreadId thread_master; 616 unsigned int suspend_count; 617 unsigned int signal_count; 618 Controller* controller; 619 620 bool prompt_set; 621 PromptCallback* prompt_callback; 622 const void* prompt_client_data; 623 #ifdef USE_TIMERFD 624 int timer_fd; 625 #endif // USE_TIMERFD 626 627 #ifdef WIN32 628 int Win32AddStream(Stream& theStream, HANDLE theHandle); 629 void Win32RemoveStream(int index); 630 bool Win32IncreaseStreamArraySize(); 631 static LRESULT CALLBACK MessageHandler(HWND hwnd, UINT message, WPARAM wParam, LPARAM lParam); 632 #ifdef _WIN32_WCE 633 static DWORD WINAPI DoThreadStart(LPVOID lpParameter); 634 #else 635 static unsigned int __stdcall DoThreadStart(void* lpParameter); 636 #endif // if/else _WIN32_WCE 637 ExitStatus exit_status; 638 enum {DEFAULT_ITEM_ARRAY_SIZE = 32}; 639 HANDLE stream_handles_default[DEFAULT_ITEM_ARRAY_SIZE]; 640 Stream* stream_ptrs_default[DEFAULT_ITEM_ARRAY_SIZE]; 641 DWORD stream_array_size; 642 HANDLE* stream_handles_array; 643 Stream** stream_ptrs_array; 644 DWORD stream_count; 645 HWND msg_window; 646 HANDLE break_event; 647 GenericStream* break_event_stream; 648 bool socket_io_pending; 649 HANDLE actual_thread_handle; 650 #else // UNIX 651 static void* DoThreadStart(void* arg); 652 int exit_status; 653 fd_set input_set; 654 fd_set output_set; 655 int break_pipe_fd[2]; 656 #endif // if/else WIN32/UNIX 657 658 }; // end class ProtoDispatcher 659 660 #endif // _PROTO_DISPATCHER 661