1{ 2 3 fpAsync: Asynchronous event management for Free Pascal 4 Copyright (C) 2001-2005 by 5 Areca Systems GmbH / Sebastian Guenther, sg@freepascal.org 6 7 See the file COPYING.FPC, included in this distribution, 8 for details about the copyright. 9 10 This program is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 13} 14 15unit fpAsync; 16 17{$MODE objfpc} 18{$H+} 19 20interface 21 22uses SysUtils, Classes, libasync; 23 24type 25 26 TNotifyEvent = procedure(Sender: TObject) of object; 27 28 EAsyncError = class(Exception) 29 private 30 FErrorCode: TAsyncResult; 31 public 32 constructor Create(AErrorCode: TAsyncResult); 33 property ErrorCode: TAsyncResult read FErrorCode; 34 end; 35 36 TEventLoop = class 37 private 38 FData: TAsyncData; 39 FFirstNotifyData: Pointer; 40 function GetIsRunning: Boolean; 41 procedure SetIsRunning(AIsRunning: Boolean); 42 protected 43 procedure CheckResult(AResultCode: TAsyncResult); 44 public 45 constructor Create; 46 destructor Destroy; override; 47 function Handle: TAsyncHandle; 48 49 // Main loop control 50 procedure Run; 51 procedure Break; 52 53 // Timer support 54 function AddTimerCallback(AMSec: LongInt; APeriodic: Boolean; 55 ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer; 56 procedure RemoveTimerCallback(ATimer: TAsyncTimer); 57 function AddTimerNotify(AMSec: LongInt; APeriodic: Boolean; 58 ANotify: TNotifyEvent; ASender: TObject): Pointer; 59 procedure RemoveTimerNotify(AHandle: Pointer); 60 61 // I/O notification support (for files, sockets etc.) 62 procedure SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback; 63 AUserData: Pointer); 64 procedure ClearIOCallback(AHandle: Integer); 65 function SetIONotify(AHandle: Integer; ANotify: TNotifyEvent; 66 ASender: TObject): Pointer; 67 procedure ClearIONotify(AHandle: Pointer); 68 69 procedure SetDataAvailableCallback(AHandle: Integer; 70 ACallback: TAsyncCallback; AUserData: Pointer); 71 procedure ClearDataAvailableCallback(AHandle: Integer); 72 function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent; 73 ASender: TObject): Pointer; 74 procedure ClearDataAvailableNotify(AHandle: Pointer); 75 76 procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback; 77 AUserData: Pointer); 78 procedure ClearCanWriteCallback(AHandle: Integer); 79 function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent; 80 ASender: TObject): Pointer; 81 procedure ClearCanWriteNotify(AHandle: Pointer); 82 83 84 class function TimerTicks: Int64; 85 86 // Properties 87 property IsRunning: Boolean read GetIsRunning write SetIsRunning; 88 end; 89 90 91// ------------------------------------------------------------------- 92// Asynchronous line reader 93// ------------------------------------------------------------------- 94 95 TLineNotify = procedure(const ALine: String) of object; 96 97 TGenericLineReader = class 98 protected 99 RealBuffer, FBuffer: PChar; 100 FBytesInBuffer: Integer; 101 FOnLine: TLineNotify; 102 InCallback, DoStopAndFree: Boolean; 103 104 function Read(var ABuffer; count: Integer): Integer; virtual; abstract; 105 procedure NoData; virtual; abstract; 106 107 public 108 destructor Destroy; override; 109 procedure Run; // Process as many lines as possible 110 111 property Buffer: PChar read FBuffer; 112 property BytesInBuffer: Integer read FBytesInBuffer; 113 property OnLine: TLineNotify read FOnLine write FOnLine; 114 end; 115 116 TAsyncStreamLineReader = class(TGenericLineReader) 117 protected 118 FEventLoop: TEventLoop; 119 FDataStream: TStream; 120 FBlockingStream: THandleStream; 121 FOnEOF: TNotifyEvent; 122 NotifyHandle: Pointer; 123 124 function Read(var ABuffer; count: Integer): Integer; override; 125 procedure NoData; override; 126 procedure StreamDataAvailable(UserData: TObject); 127 public 128 constructor Create(AEventLoop: TEventLoop; AStream: THandleStream); 129 constructor Create(AEventLoop: TEventLoop; ADataStream: TStream; 130 ABlockingStream: THandleStream); 131 destructor Destroy; override; 132 procedure StopAndFree; // Destroy instance after run 133 134 property EventLoop: TEventLoop read FEventLoop; 135 property DataStream: TStream read FDataStream; 136 property BlockingStream: THandleStream read FBlockingStream; 137 property OnEOF: TNotifyEvent read FOnEOF write FOnEOF; 138 end; 139 140 141// ------------------------------------------------------------------- 142// Asynchronous write buffers 143// ------------------------------------------------------------------- 144 145 TWriteBuffer = class(TStream) 146 protected 147 FBuffer: PChar; 148 FBytesInBuffer: Integer; 149 FBufferSent: Boolean; 150 FOnBufferEmpty: TNotifyEvent; 151 FOnBufferSent: TNotifyEvent; 152 InCallback: Boolean; 153 154 function Seek(Offset: LongInt; Origin: Word): LongInt; override; 155 function Write(const ABuffer; Count: LongInt): LongInt; override; 156 function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract; 157 procedure WritingFailed; virtual; abstract; 158 procedure WantWrite; virtual; abstract; 159 procedure BufferEmpty; virtual; 160 public 161 EndOfLineMarker: String; 162 163 constructor Create; 164 destructor Destroy; override; 165 procedure WriteLine(const line: String); 166 procedure Run; // Write as many data as possible 167 168 property BytesInBuffer: Integer read FBytesInBuffer; 169 property BufferSent: Boolean read FBufferSent; 170 property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty; 171 property OnBufferSent: TNotifyEvent read FOnBufferSent write FOnBufferSent; 172 end; 173 174 175 TAsyncWriteStream = class(TWriteBuffer) 176 protected 177 FEventLoop: TEventLoop; 178 FDataStream: TStream; 179 FBlockingStream: THandleStream; 180 NotifyHandle: Pointer; 181 DoStopAndFree: Boolean; 182 183 function DoRealWrite(const ABuffer; Count: Integer): Integer; override; 184 procedure WritingFailed; override; 185 procedure WantWrite; override; 186 procedure CanWrite(UserData: TObject); 187 public 188 constructor Create(AEventLoop: TEventLoop; AStream: THandleStream); 189 constructor Create(AEventLoop: TEventLoop; 190 ADataStream: TStream; ABlockingStream: THandleStream); 191 destructor Destroy; override; 192 procedure StopAndFree; // Destroy instance after run 193 194 property EventLoop: TEventLoop read FEventLoop; 195 property DataStream: TStream read FDataStream; 196 property BlockingStream: THandleStream read FBlockingStream; 197 end; 198 199 200var 201 { All data written to a TWriteBuffer or descendant class will be written to 202 this stream as well: } 203 fpAsyncWriteBufferDebugStream: TStream; 204 205 206implementation 207 208type 209 PNotifyData = ^TNotifyData; 210 TNotifyData = record 211 Next: PNotifyData; 212 Notify: TNotifyEvent; 213 Sender: TObject; 214 case Boolean of 215 False: (TimerHandle: TAsyncTimer); 216 True: (FileHandle: LongInt); 217 end; 218 219 220procedure EventHandler(Data: Pointer); cdecl; 221begin 222 with PNotifyData(Data)^ do 223 Notify(Sender); 224end; 225 226 227function AddNotifyData(Obj: TEventLoop): PNotifyData; 228begin 229 New(Result); 230 Result^.Next := PNotifyData(Obj.FFirstNotifyData); 231 Obj.FFirstNotifyData := Result; 232end; 233 234procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData); 235var 236 CurData, PrevData, NextData: PNotifyData; 237begin 238 PrevData := nil; 239 CurData := Obj.FFirstNotifyData; 240 while Assigned(CurData) do 241 begin 242 NextData := CurData^.Next; 243 if CurData = Data then 244 if Assigned(PrevData) then 245 PrevData^.Next := NextData 246 else 247 Obj.FFirstNotifyData := NextData; 248 PrevData := CurData; 249 CurData := NextData; 250 end; 251 252 Dispose(Data); 253end; 254 255 256constructor EAsyncError.Create(AErrorCode: TAsyncResult); 257begin 258 inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)])); 259 FErrorCode := AErrorCode; 260end; 261 262 263constructor TEventLoop.Create; 264begin 265 asyncInit(Handle); 266end; 267 268destructor TEventLoop.Destroy; 269var 270 NotifyData, NextNotifyData: PNotifyData; 271begin 272 asyncFree(Handle); 273 NotifyData := FFirstNotifyData; 274 while Assigned(NotifyData) do 275 begin 276 NextNotifyData := NotifyData^.Next; 277 Dispose(NotifyData); 278 NotifyData := NextNotifyData; 279 end; 280end; 281 282function TEventLoop.Handle: TAsyncHandle; 283begin 284 Result := TAsyncHandle(Self); 285end; 286 287procedure TEventLoop.Run; 288begin 289 asyncRun(Handle); 290end; 291 292procedure TEventLoop.Break; 293begin 294 asyncBreak(Handle); 295end; 296 297function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean; 298 ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer; 299begin 300 Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData); 301end; 302 303procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer); 304begin 305 asyncRemoveTimer(Handle, ATimer); 306end; 307 308function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean; 309 ANotify: TNotifyEvent; ASender: TObject): Pointer; 310var 311 UserData: PNotifyData; 312begin 313 UserData := AddNotifyData(Self); 314 UserData^.Notify := ANotify; 315 UserData^.Sender := ASender; 316 UserData^.TimerHandle := 317 asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData); 318 Result := UserData; 319end; 320 321procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer); 322var 323 Data: PNotifyData; 324begin 325 Data := PNotifyData(AHandle); 326 asyncRemoveTimer(Handle, Data^.TimerHandle); 327 FreeNotifyData(Self, Data); 328end; 329 330procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback; 331 AUserData: Pointer); 332begin 333 CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData)); 334end; 335 336procedure TEventLoop.ClearIOCallback(AHandle: Integer); 337begin 338 asyncClearIOCallback(Handle, AHandle); 339end; 340 341function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent; 342 ASender: TObject): Pointer; 343var 344 UserData: PNotifyData; 345 ResultCode: TAsyncResult; 346begin 347 UserData := AddNotifyData(Self); 348 UserData^.Notify := ANotify; 349 UserData^.Sender := ASender; 350 UserData^.FileHandle := AHandle; 351 ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData); 352 if ResultCode <> asyncOK then 353 begin 354 FreeNotifyData(Self, UserData); 355 raise EAsyncError.Create(ResultCode); 356 end else 357 Result := UserData; 358 {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF} 359end; 360 361procedure TEventLoop.ClearIONotify(AHandle: Pointer); 362var 363 Data: PNotifyData; 364begin 365 Data := PNotifyData(AHandle); 366 {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF} 367 asyncClearIOCallback(Handle, Data^.FileHandle); 368 FreeNotifyData(Self, Data); 369end; 370 371procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback; 372 AUserData: Pointer); 373begin 374 CheckResult(asyncSetDataAvailableCallback(Handle, AHandle, 375 ACallback, AUserData)); 376end; 377 378procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer); 379begin 380 asyncClearDataAvailableCallback(Handle, AHandle); 381end; 382 383function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent; 384 ASender: TObject): Pointer; 385var 386 UserData: PNotifyData; 387 ResultCode: TAsyncResult; 388begin 389 UserData := AddNotifyData(Self); 390 UserData^.Notify := ANotify; 391 UserData^.Sender := ASender; 392 UserData^.FileHandle := AHandle; 393 ResultCode := asyncSetDataAvailableCallback(Handle, AHandle, 394 @EventHandler, UserData); 395 if ResultCode <> asyncOK then 396 begin 397 FreeNotifyData(Self, UserData); 398 raise EAsyncError.Create(ResultCode); 399 end else 400 Result := UserData; 401 {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF} 402end; 403 404procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer); 405var 406 Data: PNotifyData; 407begin 408 Data := PNotifyData(AHandle); 409 {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF} 410 asyncClearDataAvailableCallback(Handle, Data^.FileHandle); 411 FreeNotifyData(Self, Data); 412end; 413 414procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback; 415 AUserData: Pointer); 416begin 417 CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData)); 418end; 419 420procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer); 421begin 422 asyncClearCanWriteCallback(Handle, AHandle); 423end; 424 425function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent; 426 ASender: TObject): Pointer; 427var 428 UserData: PNotifyData; 429 ResultCode: TAsyncResult; 430begin 431 UserData := AddNotifyData(Self); 432 UserData^.Notify := ANotify; 433 UserData^.Sender := ASender; 434 UserData^.FileHandle := AHandle; 435 ResultCode := asyncSetCanWriteCallback(Handle, AHandle, 436 @EventHandler, UserData); 437 if ResultCode <> asyncOK then 438 begin 439 FreeNotifyData(Self, UserData); 440 raise EAsyncError.Create(ResultCode); 441 end else 442 Result := UserData; 443 {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF} 444end; 445 446procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer); 447var 448 Data: PNotifyData; 449begin 450 Data := PNotifyData(AHandle); 451 {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF} 452 asyncClearCanWriteCallback(Handle, Data^.FileHandle); 453 FreeNotifyData(Self, Data); 454end; 455 456class function TEventLoop.TimerTicks: Int64; 457begin 458 Result := asyncGetTicks; 459end; 460 461procedure TEventLoop.CheckResult(AResultCode: TAsyncResult); 462begin 463 if AResultCode <> asyncOK then 464 raise EAsyncError.Create(AResultCode); 465end; 466 467function TEventLoop.GetIsRunning: Boolean; 468begin 469 Result := asyncIsRunning(Handle); 470end; 471 472procedure TEventLoop.SetIsRunning(AIsRunning: Boolean); 473begin 474 if IsRunning then 475 begin 476 if not AIsRunning then 477 Run; 478 end else 479 if AIsRunning then 480 Break; 481end; 482 483 484// ------------------------------------------------------------------- 485// TGenericLineReader 486// ------------------------------------------------------------------- 487 488destructor TGenericLineReader.Destroy; 489begin 490 if Assigned(RealBuffer) then 491 begin 492 FreeMem(RealBuffer); 493 RealBuffer := nil; 494 end; 495 inherited Destroy; 496end; 497 498procedure TGenericLineReader.Run; 499var 500 NewData: array[0..1023] of Byte; 501 p: PChar; 502 BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer; 503 line: String; 504 FirstRun: Boolean; 505begin 506 FirstRun := True; 507 while True do 508 begin 509 BytesRead := Read(NewData, SizeOf(NewData)); 510 //WriteLn('Linereader: ', BytesRead, ' bytes read'); 511 if BytesRead <= 0 then 512 begin 513 if FirstRun then 514 NoData; 515 break; 516 end; 517 FirstRun := False; 518 OldBufSize := FBytesInBuffer; 519 520 // Append the new received data to the read buffer 521 Inc(FBytesInBuffer, BytesRead); 522 ReallocMem(RealBuffer, FBytesInBuffer); 523 Move(NewData, RealBuffer[OldBufSize], BytesRead); 524 525 {Process all potential lines in the current buffer. Attention: FBuffer and 526 FBytesInBuffer MUST be updated for each line, as they can be accessed from 527 within the FOnLine handler!} 528 LastEndOfLine := 0; 529 if OldBufSize > 0 then 530 i := OldBufSize - 1 531 else 532 i := 0; 533 534 CurBytesInBuffer := FBytesInBuffer; 535 536 while i <= CurBytesInBuffer - 2 do 537 begin 538 if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then 539 begin 540 LineLength := i - LastEndOfLine; 541 SetLength(line, LineLength); 542 if LineLength > 0 then 543 Move(RealBuffer[LastEndOfLine], line[1], LineLength); 544 545 if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or 546 ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then 547 Inc(i); 548 LastEndOfLine := i + 1; 549 550 if Assigned(FOnLine) then 551 begin 552 FBuffer := RealBuffer + LastEndOfLine; 553 FBytesInBuffer := CurBytesInBuffer - LastEndOfLine; 554 InCallback := True; 555 try 556 FOnLine(line); 557 finally 558 InCallback := False; 559 end; 560 // Check if <this> has been destroyed by FOnLine: 561 if DoStopAndFree then 562 exit; 563 end; 564 end; 565 Inc(i); 566 end; 567 568 FBytesInBuffer := CurBytesInBuffer; 569 570 if LastEndOfLine > 0 then 571 begin 572 // Remove all processed lines from the buffer 573 Dec(FBytesInBuffer, LastEndOfLine); 574 GetMem(p, FBytesInBuffer); 575 Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer); 576 if Assigned(RealBuffer) then 577 FreeMem(RealBuffer); 578 RealBuffer := p; 579 end; 580 FBuffer := RealBuffer; 581 end; 582end; 583 584 585// ------------------------------------------------------------------- 586// TAsyncStreamLineReader 587// ------------------------------------------------------------------- 588 589constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop; 590 AStream: THandleStream); 591begin 592 Self.Create(AEventLoop, AStream, AStream); 593end; 594 595constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop; 596 ADataStream: TStream; ABlockingStream: THandleStream); 597begin 598 ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream)); 599 600 inherited Create; 601 FEventLoop := AEventLoop; 602 FDataStream := ADataStream; 603 FBlockingStream := ABlockingStream; 604 NotifyHandle := EventLoop.SetDataAvailableNotify( 605 FBlockingStream.Handle, @StreamDataAvailable, nil); 606end; 607 608destructor TAsyncStreamLineReader.Destroy; 609begin 610 inherited Destroy; 611end; 612 613procedure TAsyncStreamLineReader.StopAndFree; 614begin 615 if InCallback then 616 begin 617 if Assigned(NotifyHandle) then 618 begin 619 EventLoop.ClearDataAvailableNotify(NotifyHandle); 620 NotifyHandle := nil; 621 end; 622 DoStopAndFree := True; 623 end else 624 Self.Free; 625end; 626 627function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer; 628begin 629 Result := FDataStream.Read(ABuffer, count); 630end; 631 632procedure TAsyncStreamLineReader.NoData; 633var 634 s: String; 635begin 636 if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then 637 begin 638 639 if (FBytesInBuffer > 0) and Assigned(FOnLine) then 640 begin 641 if FBuffer[FBytesInBuffer - 1] in [#13, #10] then 642 Dec(FBytesInBuffer); 643 SetLength(s, FBytesInBuffer); 644 Move(FBuffer^, s[1], FBytesInBuffer); 645 FOnLine(s); 646 end; 647 648 EventLoop.ClearDataAvailableNotify(NotifyHandle); 649 NotifyHandle := nil; 650 if Assigned(FOnEOF) then 651 begin 652 InCallback := True; 653 try 654 FOnEOF(Self); 655 finally 656 InCallback := False; 657 end; 658 end; 659 end; 660end; 661 662procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject); 663begin 664 Run; 665 if DoStopAndFree then 666 Free; 667end; 668 669 670// ------------------------------------------------------------------- 671// TWriteBuffer 672// ------------------------------------------------------------------- 673 674procedure TWriteBuffer.BufferEmpty; 675begin 676 if Assigned(FOnBufferEmpty) then 677 begin 678 InCallback := True; 679 FOnBufferEmpty(Self); 680 InCallback := False; 681 end; 682end; 683 684constructor TWriteBuffer.Create; 685begin 686 inherited Create; 687 688 FBuffer := nil; 689 FBytesInBuffer := 0; 690 EndOfLineMarker := #10; 691end; 692 693destructor TWriteBuffer.Destroy; 694begin 695 if Assigned(FBuffer) then 696 FreeMem(FBuffer); 697 inherited Destroy; 698end; 699 700function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt; 701begin 702 if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or 703 ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then 704 Result := FBytesInBuffer 705 else 706 // !!!: No i18n for this string - solve this problem in the FCL?!? 707 raise EStreamError.Create('Invalid stream operation'); 708end; 709 710function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt; 711begin 712 if Count > 0 then 713 begin 714 FBufferSent := False; 715 ReallocMem(FBuffer, FBytesInBuffer + Count); 716 Move(ABuffer, FBuffer[FBytesInBuffer], Count); 717 Inc(FBytesInBuffer, Count); 718 if Assigned(fpAsyncWriteBufferDebugStream) then 719 fpAsyncWriteBufferDebugStream.Write(ABuffer, Count); 720 WantWrite; 721 end; 722 Result := Count; 723end; 724 725procedure TWriteBuffer.WriteLine(const line: String); 726var 727 s: String; 728begin 729 s := line + EndOfLineMarker; 730 WriteBuffer(s[1], Length(s)); 731end; 732 733procedure TWriteBuffer.Run; 734var 735 Written: Integer; 736 NewBuf: PChar; 737 Failed: Boolean; 738begin 739 Failed := True; 740 repeat 741 if FBytesInBuffer = 0 then 742 begin 743 BufferEmpty; 744 if FBufferSent then 745 exit; 746 WantWrite; 747 exit; 748 end; 749 750 Written := DoRealWrite(FBuffer[0], FBytesInBuffer); 751 if Written > 0 then 752 begin 753 Failed := False; 754 Dec(FBytesInBuffer, Written); 755 GetMem(NewBuf, FBytesInBuffer); 756 Move(FBuffer[Written], NewBuf[0], FBytesInBuffer); 757 FreeMem(FBuffer); 758 FBuffer := NewBuf; 759 end; 760 until Written <= 0; 761 762 if Failed then 763 WritingFailed; 764end; 765 766 767// ------------------------------------------------------------------- 768// TAsyncWriteStream 769// ------------------------------------------------------------------- 770 771function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer; 772begin 773 Result := FDataStream.Write(ABuffer, count); 774end; 775 776procedure TAsyncWriteStream.WritingFailed; 777begin 778 if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then 779 begin 780 EventLoop.ClearCanWriteNotify(NotifyHandle); 781 NotifyHandle := nil; 782 end; 783end; 784 785procedure TAsyncWriteStream.WantWrite; 786begin 787 if not Assigned(NotifyHandle) then 788 NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle, 789 @CanWrite, nil); 790end; 791 792procedure TAsyncWriteStream.CanWrite(UserData: TObject); 793begin 794 if FBytesInBuffer = 0 then 795 begin 796 if Assigned(NotifyHandle) then 797 begin 798 EventLoop.ClearCanWriteNotify(NotifyHandle); 799 NotifyHandle := nil; 800 end; 801 FBufferSent := True; 802 if Assigned(FOnBufferSent) then 803 begin 804 InCallback := True; 805 try 806 FOnBufferSent(Self); 807 finally 808 InCallback := False; 809 end; 810 end; 811 end else 812 Run; 813 if DoStopAndFree then 814 Free; 815end; 816 817constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop; 818 AStream: THandleStream); 819begin 820 Self.Create(AEventLoop, AStream, AStream); 821end; 822 823constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop; 824 ADataStream: TStream; ABlockingStream: THandleStream); 825begin 826 ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream)); 827 828 inherited Create; 829 FEventLoop := AEventLoop; 830 FDataStream := ADataStream; 831 FBlockingStream := ABlockingStream; 832end; 833 834destructor TAsyncWriteStream.Destroy; 835begin 836 if Assigned(NotifyHandle) then 837 EventLoop.ClearCanWriteNotify(NotifyHandle); 838 inherited Destroy; 839end; 840 841procedure TAsyncWriteStream.StopAndFree; 842begin 843 if InCallback then 844 begin 845 if Assigned(NotifyHandle) then 846 begin 847 EventLoop.ClearCanWriteNotify(NotifyHandle); 848 NotifyHandle := nil; 849 end; 850 DoStopAndFree := True; 851 end else 852 Self.Free; 853end; 854 855 856end. 857