1{
2
3    fpAsync: Asynchronous event management for Free Pascal
4    Copyright (C) 2001-2005 by
5      Areca Systems GmbH / Sebastian Guenther, sg@freepascal.org
6
7    See the file COPYING.FPC, included in this distribution,
8    for details about the copyright.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13}
14
15unit fpAsync;
16
17{$MODE objfpc}
18{$H+}
19
20interface
21
22uses SysUtils, Classes, libasync;
23
24type
25
26  TNotifyEvent = procedure(Sender: TObject) of object;
27
28  EAsyncError = class(Exception)
29  private
30    FErrorCode: TAsyncResult;
31  public
32    constructor Create(AErrorCode: TAsyncResult);
33    property ErrorCode: TAsyncResult read FErrorCode;
34  end;
35
36  TEventLoop = class
37  private
38    FData: TAsyncData;
39    FFirstNotifyData: Pointer;
40    function GetIsRunning: Boolean;
41    procedure SetIsRunning(AIsRunning: Boolean);
42  protected
43    procedure CheckResult(AResultCode: TAsyncResult);
44  public
45    constructor Create;
46    destructor Destroy; override;
47    function Handle: TAsyncHandle;
48
49    // Main loop control
50    procedure Run;
51    procedure Break;
52
53    // Timer support
54    function AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
55      ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
56    procedure RemoveTimerCallback(ATimer: TAsyncTimer);
57    function AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
58      ANotify: TNotifyEvent; ASender: TObject): Pointer;
59    procedure RemoveTimerNotify(AHandle: Pointer);
60
61    // I/O notification support (for files, sockets etc.)
62    procedure SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
63      AUserData: Pointer);
64    procedure ClearIOCallback(AHandle: Integer);
65    function SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
66      ASender: TObject): Pointer;
67    procedure ClearIONotify(AHandle: Pointer);
68
69    procedure SetDataAvailableCallback(AHandle: Integer;
70      ACallback: TAsyncCallback; AUserData: Pointer);
71    procedure ClearDataAvailableCallback(AHandle: Integer);
72    function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
73      ASender: TObject): Pointer;
74    procedure ClearDataAvailableNotify(AHandle: Pointer);
75
76    procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
77      AUserData: Pointer);
78    procedure ClearCanWriteCallback(AHandle: Integer);
79    function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
80      ASender: TObject): Pointer;
81    procedure ClearCanWriteNotify(AHandle: Pointer);
82
83
84    class function TimerTicks: Int64;
85
86    // Properties
87    property IsRunning: Boolean read GetIsRunning write SetIsRunning;
88  end;
89
90
91// -------------------------------------------------------------------
92//   Asynchronous line reader
93// -------------------------------------------------------------------
94
95  TLineNotify = procedure(const ALine: String) of object;
96
97  TGenericLineReader = class
98  protected
99    RealBuffer, FBuffer: PChar;
100    FBytesInBuffer: Integer;
101    FOnLine: TLineNotify;
102    InCallback, DoStopAndFree: Boolean;
103
104    function  Read(var ABuffer; count: Integer): Integer; virtual; abstract;
105    procedure NoData; virtual; abstract;
106
107  public
108    destructor Destroy; override;
109    procedure Run;              // Process as many lines as possible
110
111    property Buffer: PChar read FBuffer;
112    property BytesInBuffer: Integer read FBytesInBuffer;
113    property OnLine: TLineNotify read FOnLine write FOnLine;
114  end;
115
116  TAsyncStreamLineReader = class(TGenericLineReader)
117  protected
118    FEventLoop: TEventLoop;
119    FDataStream: TStream;
120    FBlockingStream: THandleStream;
121    FOnEOF: TNotifyEvent;
122    NotifyHandle: Pointer;
123
124    function  Read(var ABuffer; count: Integer): Integer; override;
125    procedure NoData; override;
126    procedure StreamDataAvailable(UserData: TObject);
127  public
128    constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
129    constructor Create(AEventLoop: TEventLoop; ADataStream: TStream;
130      ABlockingStream: THandleStream);
131    destructor Destroy; override;
132    procedure StopAndFree;      // Destroy instance after run
133
134    property EventLoop: TEventLoop read FEventLoop;
135    property DataStream: TStream read FDataStream;
136    property BlockingStream: THandleStream read FBlockingStream;
137    property OnEOF: TNotifyEvent read FOnEOF write FOnEOF;
138  end;
139
140
141// -------------------------------------------------------------------
142//   Asynchronous write buffers
143// -------------------------------------------------------------------
144
145  TWriteBuffer = class(TStream)
146  protected
147    FBuffer: PChar;
148    FBytesInBuffer: Integer;
149    FBufferSent: Boolean;
150    FOnBufferEmpty: TNotifyEvent;
151    FOnBufferSent: TNotifyEvent;
152    InCallback: Boolean;
153
154    function  Seek(Offset: LongInt; Origin: Word): LongInt; override;
155    function  Write(const ABuffer; Count: LongInt): LongInt; override;
156    function  DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract;
157    procedure WritingFailed; virtual; abstract;
158    procedure WantWrite; virtual; abstract;
159    procedure BufferEmpty; virtual;
160  public
161    EndOfLineMarker: String;
162
163    constructor Create;
164    destructor Destroy; override;
165    procedure WriteLine(const line: String);
166    procedure Run;              // Write as many data as possible
167
168    property BytesInBuffer: Integer read FBytesInBuffer;
169    property BufferSent: Boolean read FBufferSent;
170    property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty;
171    property OnBufferSent: TNotifyEvent read FOnBufferSent write FOnBufferSent;
172  end;
173
174
175  TAsyncWriteStream = class(TWriteBuffer)
176  protected
177    FEventLoop: TEventLoop;
178    FDataStream: TStream;
179    FBlockingStream: THandleStream;
180    NotifyHandle: Pointer;
181    DoStopAndFree: Boolean;
182
183    function  DoRealWrite(const ABuffer; Count: Integer): Integer; override;
184    procedure WritingFailed; override;
185    procedure WantWrite; override;
186    procedure CanWrite(UserData: TObject);
187  public
188    constructor Create(AEventLoop: TEventLoop; AStream: THandleStream);
189    constructor Create(AEventLoop: TEventLoop;
190      ADataStream: TStream; ABlockingStream: THandleStream);
191    destructor Destroy; override;
192    procedure StopAndFree;      // Destroy instance after run
193
194    property EventLoop: TEventLoop read FEventLoop;
195    property DataStream: TStream read FDataStream;
196    property BlockingStream: THandleStream read FBlockingStream;
197  end;
198
199
200var
201  { All data written to a TWriteBuffer or descendant class will be written to
202    this stream as well: }
203  fpAsyncWriteBufferDebugStream: TStream;
204
205
206implementation
207
208type
209  PNotifyData = ^TNotifyData;
210  TNotifyData = record
211    Next: PNotifyData;
212    Notify: TNotifyEvent;
213    Sender: TObject;
214    case Boolean of
215      False: (TimerHandle: TAsyncTimer);
216      True: (FileHandle: LongInt);
217  end;
218
219
220procedure EventHandler(Data: Pointer); cdecl;
221begin
222  with PNotifyData(Data)^ do
223    Notify(Sender);
224end;
225
226
227function AddNotifyData(Obj: TEventLoop): PNotifyData;
228begin
229  New(Result);
230  Result^.Next := PNotifyData(Obj.FFirstNotifyData);
231  Obj.FFirstNotifyData := Result;
232end;
233
234procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData);
235var
236  CurData, PrevData, NextData: PNotifyData;
237begin
238  PrevData := nil;
239  CurData := Obj.FFirstNotifyData;
240  while Assigned(CurData) do
241  begin
242    NextData := CurData^.Next;
243    if CurData = Data then
244      if Assigned(PrevData) then
245        PrevData^.Next := NextData
246      else
247        Obj.FFirstNotifyData := NextData;
248    PrevData := CurData;
249    CurData := NextData;
250  end;
251
252  Dispose(Data);
253end;
254
255
256constructor EAsyncError.Create(AErrorCode: TAsyncResult);
257begin
258  inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)]));
259  FErrorCode := AErrorCode;
260end;
261
262
263constructor TEventLoop.Create;
264begin
265  asyncInit(Handle);
266end;
267
268destructor TEventLoop.Destroy;
269var
270  NotifyData, NextNotifyData: PNotifyData;
271begin
272  asyncFree(Handle);
273  NotifyData := FFirstNotifyData;
274  while Assigned(NotifyData) do
275  begin
276    NextNotifyData := NotifyData^.Next;
277    Dispose(NotifyData);
278    NotifyData := NextNotifyData;
279  end;
280end;
281
282function TEventLoop.Handle: TAsyncHandle;
283begin
284  Result := TAsyncHandle(Self);
285end;
286
287procedure TEventLoop.Run;
288begin
289  asyncRun(Handle);
290end;
291
292procedure TEventLoop.Break;
293begin
294  asyncBreak(Handle);
295end;
296
297function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean;
298  ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer;
299begin
300  Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData);
301end;
302
303procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer);
304begin
305  asyncRemoveTimer(Handle, ATimer);
306end;
307
308function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean;
309  ANotify: TNotifyEvent; ASender: TObject): Pointer;
310var
311  UserData: PNotifyData;
312begin
313  UserData := AddNotifyData(Self);
314  UserData^.Notify := ANotify;
315  UserData^.Sender := ASender;
316  UserData^.TimerHandle :=
317    asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData);
318  Result := UserData;
319end;
320
321procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer);
322var
323  Data: PNotifyData;
324begin
325  Data := PNotifyData(AHandle);
326  asyncRemoveTimer(Handle, Data^.TimerHandle);
327  FreeNotifyData(Self, Data);
328end;
329
330procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback;
331  AUserData: Pointer);
332begin
333  CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData));
334end;
335
336procedure TEventLoop.ClearIOCallback(AHandle: Integer);
337begin
338  asyncClearIOCallback(Handle, AHandle);
339end;
340
341function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent;
342  ASender: TObject): Pointer;
343var
344  UserData: PNotifyData;
345  ResultCode: TAsyncResult;
346begin
347  UserData := AddNotifyData(Self);
348  UserData^.Notify := ANotify;
349  UserData^.Sender := ASender;
350  UserData^.FileHandle := AHandle;
351  ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData);
352  if ResultCode <> asyncOK then
353  begin
354    FreeNotifyData(Self, UserData);
355    raise EAsyncError.Create(ResultCode);
356  end else
357    Result := UserData;
358  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
359end;
360
361procedure TEventLoop.ClearIONotify(AHandle: Pointer);
362var
363  Data: PNotifyData;
364begin
365  Data := PNotifyData(AHandle);
366  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
367  asyncClearIOCallback(Handle, Data^.FileHandle);
368  FreeNotifyData(Self, Data);
369end;
370
371procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback;
372  AUserData: Pointer);
373begin
374  CheckResult(asyncSetDataAvailableCallback(Handle, AHandle,
375    ACallback, AUserData));
376end;
377
378procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer);
379begin
380  asyncClearDataAvailableCallback(Handle, AHandle);
381end;
382
383function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent;
384  ASender: TObject): Pointer;
385var
386  UserData: PNotifyData;
387  ResultCode: TAsyncResult;
388begin
389  UserData := AddNotifyData(Self);
390  UserData^.Notify := ANotify;
391  UserData^.Sender := ASender;
392  UserData^.FileHandle := AHandle;
393  ResultCode := asyncSetDataAvailableCallback(Handle, AHandle,
394    @EventHandler, UserData);
395  if ResultCode <> asyncOK then
396  begin
397    FreeNotifyData(Self, UserData);
398    raise EAsyncError.Create(ResultCode);
399  end else
400    Result := UserData;
401  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
402end;
403
404procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer);
405var
406  Data: PNotifyData;
407begin
408  Data := PNotifyData(AHandle);
409  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
410  asyncClearDataAvailableCallback(Handle, Data^.FileHandle);
411  FreeNotifyData(Self, Data);
412end;
413
414procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback;
415  AUserData: Pointer);
416begin
417  CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData));
418end;
419
420procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer);
421begin
422  asyncClearCanWriteCallback(Handle, AHandle);
423end;
424
425function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent;
426  ASender: TObject): Pointer;
427var
428  UserData: PNotifyData;
429  ResultCode: TAsyncResult;
430begin
431  UserData := AddNotifyData(Self);
432  UserData^.Notify := ANotify;
433  UserData^.Sender := ASender;
434  UserData^.FileHandle := AHandle;
435  ResultCode := asyncSetCanWriteCallback(Handle, AHandle,
436    @EventHandler, UserData);
437  if ResultCode <> asyncOK then
438  begin
439    FreeNotifyData(Self, UserData);
440    raise EAsyncError.Create(ResultCode);
441  end else
442    Result := UserData;
443  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF}
444end;
445
446procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer);
447var
448  Data: PNotifyData;
449begin
450  Data := PNotifyData(AHandle);
451  {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF}
452  asyncClearCanWriteCallback(Handle, Data^.FileHandle);
453  FreeNotifyData(Self, Data);
454end;
455
456class function TEventLoop.TimerTicks: Int64;
457begin
458  Result := asyncGetTicks;
459end;
460
461procedure TEventLoop.CheckResult(AResultCode: TAsyncResult);
462begin
463  if AResultCode <> asyncOK then
464    raise EAsyncError.Create(AResultCode);
465end;
466
467function TEventLoop.GetIsRunning: Boolean;
468begin
469  Result := asyncIsRunning(Handle);
470end;
471
472procedure TEventLoop.SetIsRunning(AIsRunning: Boolean);
473begin
474  if IsRunning then
475  begin
476    if not AIsRunning then
477      Run;
478  end else
479    if AIsRunning then
480      Break;
481end;
482
483
484// -------------------------------------------------------------------
485//   TGenericLineReader
486// -------------------------------------------------------------------
487
488destructor TGenericLineReader.Destroy;
489begin
490  if Assigned(RealBuffer) then
491  begin
492    FreeMem(RealBuffer);
493    RealBuffer := nil;
494  end;
495  inherited Destroy;
496end;
497
498procedure TGenericLineReader.Run;
499var
500  NewData: array[0..1023] of Byte;
501  p: PChar;
502  BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer;
503  line: String;
504  FirstRun: Boolean;
505begin
506  FirstRun := True;
507  while True do
508  begin
509    BytesRead := Read(NewData, SizeOf(NewData));
510    //WriteLn('Linereader: ', BytesRead, ' bytes read');
511    if BytesRead <= 0 then
512    begin
513      if FirstRun then
514        NoData;
515      break;
516    end;
517    FirstRun := False;
518    OldBufSize := FBytesInBuffer;
519
520    // Append the new received data to the read buffer
521    Inc(FBytesInBuffer, BytesRead);
522    ReallocMem(RealBuffer, FBytesInBuffer);
523    Move(NewData, RealBuffer[OldBufSize], BytesRead);
524
525    {Process all potential lines in the current buffer. Attention: FBuffer and
526     FBytesInBuffer MUST be updated for each line, as they can be accessed from
527     within the FOnLine handler!}
528    LastEndOfLine := 0;
529    if OldBufSize > 0 then
530      i := OldBufSize - 1
531    else
532      i := 0;
533
534    CurBytesInBuffer := FBytesInBuffer;
535
536    while i <= CurBytesInBuffer - 2 do
537    begin
538      if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then
539      begin
540        LineLength := i - LastEndOfLine;
541        SetLength(line, LineLength);
542        if LineLength > 0 then
543          Move(RealBuffer[LastEndOfLine], line[1], LineLength);
544
545        if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or
546           ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then
547          Inc(i);
548        LastEndOfLine := i + 1;
549
550        if Assigned(FOnLine) then
551	begin
552          FBuffer := RealBuffer + LastEndOfLine;
553          FBytesInBuffer := CurBytesInBuffer - LastEndOfLine;
554	  InCallback := True;
555	  try
556            FOnLine(line);
557	  finally
558	    InCallback := False;
559	  end;
560          // Check if <this> has been destroyed by FOnLine:
561          if DoStopAndFree then
562	    exit;
563        end;
564      end;
565      Inc(i);
566    end;
567
568    FBytesInBuffer := CurBytesInBuffer;
569
570    if LastEndOfLine > 0 then
571    begin
572      // Remove all processed lines from the buffer
573      Dec(FBytesInBuffer, LastEndOfLine);
574      GetMem(p, FBytesInBuffer);
575      Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer);
576      if Assigned(RealBuffer) then
577        FreeMem(RealBuffer);
578      RealBuffer := p;
579    end;
580    FBuffer := RealBuffer;
581  end;
582end;
583
584
585// -------------------------------------------------------------------
586//   TAsyncStreamLineReader
587// -------------------------------------------------------------------
588
589constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
590  AStream: THandleStream);
591begin
592  Self.Create(AEventLoop, AStream, AStream);
593end;
594
595constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop;
596  ADataStream: TStream; ABlockingStream: THandleStream);
597begin
598  ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
599
600  inherited Create;
601  FEventLoop := AEventLoop;
602  FDataStream := ADataStream;
603  FBlockingStream := ABlockingStream;
604  NotifyHandle := EventLoop.SetDataAvailableNotify(
605    FBlockingStream.Handle, @StreamDataAvailable, nil);
606end;
607
608destructor TAsyncStreamLineReader.Destroy;
609begin
610  inherited Destroy;
611end;
612
613procedure TAsyncStreamLineReader.StopAndFree;
614begin
615  if InCallback then
616  begin
617    if Assigned(NotifyHandle) then
618    begin
619      EventLoop.ClearDataAvailableNotify(NotifyHandle);
620      NotifyHandle := nil;
621    end;
622    DoStopAndFree := True;
623  end else
624    Self.Free;
625end;
626
627function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer;
628begin
629  Result := FDataStream.Read(ABuffer, count);
630end;
631
632procedure TAsyncStreamLineReader.NoData;
633var
634  s: String;
635begin
636  if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then
637  begin
638
639    if (FBytesInBuffer > 0) and Assigned(FOnLine) then
640    begin
641      if FBuffer[FBytesInBuffer - 1] in [#13, #10] then
642        Dec(FBytesInBuffer);
643      SetLength(s, FBytesInBuffer);
644      Move(FBuffer^, s[1], FBytesInBuffer);
645      FOnLine(s);
646    end;
647
648    EventLoop.ClearDataAvailableNotify(NotifyHandle);
649    NotifyHandle := nil;
650    if Assigned(FOnEOF) then
651    begin
652      InCallback := True;
653      try
654        FOnEOF(Self);
655      finally
656        InCallback := False;
657      end;
658    end;
659  end;
660end;
661
662procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject);
663begin
664  Run;
665  if DoStopAndFree then
666    Free;
667end;
668
669
670// -------------------------------------------------------------------
671//   TWriteBuffer
672// -------------------------------------------------------------------
673
674procedure TWriteBuffer.BufferEmpty;
675begin
676  if Assigned(FOnBufferEmpty) then
677  begin
678    InCallback := True;
679    FOnBufferEmpty(Self);
680    InCallback := False;
681  end;
682end;
683
684constructor TWriteBuffer.Create;
685begin
686  inherited Create;
687
688  FBuffer := nil;
689  FBytesInBuffer := 0;
690  EndOfLineMarker := #10;
691end;
692
693destructor TWriteBuffer.Destroy;
694begin
695  if Assigned(FBuffer) then
696    FreeMem(FBuffer);
697  inherited Destroy;
698end;
699
700function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt;
701begin
702  if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or
703     ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then
704    Result := FBytesInBuffer
705  else
706    // !!!: No i18n for this string - solve this problem in the FCL?!?
707    raise EStreamError.Create('Invalid stream operation');
708end;
709
710function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt;
711begin
712  if Count > 0 then
713  begin
714    FBufferSent := False;
715    ReallocMem(FBuffer, FBytesInBuffer + Count);
716    Move(ABuffer, FBuffer[FBytesInBuffer], Count);
717    Inc(FBytesInBuffer, Count);
718    if Assigned(fpAsyncWriteBufferDebugStream) then
719      fpAsyncWriteBufferDebugStream.Write(ABuffer, Count);
720    WantWrite;
721  end;
722  Result := Count;
723end;
724
725procedure TWriteBuffer.WriteLine(const line: String);
726var
727  s: String;
728begin
729  s := line + EndOfLineMarker;
730  WriteBuffer(s[1], Length(s));
731end;
732
733procedure TWriteBuffer.Run;
734var
735  Written: Integer;
736  NewBuf: PChar;
737  Failed: Boolean;
738begin
739  Failed := True;
740  repeat
741    if FBytesInBuffer = 0 then
742    begin
743      BufferEmpty;
744      if FBufferSent then
745        exit;
746      WantWrite;
747      exit;
748    end;
749
750    Written := DoRealWrite(FBuffer[0], FBytesInBuffer);
751    if Written > 0 then
752    begin
753      Failed := False;
754      Dec(FBytesInBuffer, Written);
755      GetMem(NewBuf, FBytesInBuffer);
756      Move(FBuffer[Written], NewBuf[0], FBytesInBuffer);
757      FreeMem(FBuffer);
758      FBuffer := NewBuf;
759    end;
760  until Written <= 0;
761
762  if Failed then
763    WritingFailed;
764end;
765
766
767// -------------------------------------------------------------------
768//   TAsyncWriteStream
769// -------------------------------------------------------------------
770
771function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer;
772begin
773  Result := FDataStream.Write(ABuffer, count);
774end;
775
776procedure TAsyncWriteStream.WritingFailed;
777begin
778  if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then
779  begin
780    EventLoop.ClearCanWriteNotify(NotifyHandle);
781    NotifyHandle := nil;
782  end;
783end;
784
785procedure TAsyncWriteStream.WantWrite;
786begin
787  if not Assigned(NotifyHandle) then
788    NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle,
789      @CanWrite, nil);
790end;
791
792procedure TAsyncWriteStream.CanWrite(UserData: TObject);
793begin
794  if FBytesInBuffer = 0 then
795  begin
796    if Assigned(NotifyHandle) then
797    begin
798      EventLoop.ClearCanWriteNotify(NotifyHandle);
799      NotifyHandle := nil;
800    end;
801    FBufferSent := True;
802    if Assigned(FOnBufferSent) then
803    begin
804      InCallback := True;
805      try
806        FOnBufferSent(Self);
807      finally
808        InCallback := False;
809      end;
810    end;
811  end else
812    Run;
813  if DoStopAndFree then
814    Free;
815end;
816
817constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
818  AStream: THandleStream);
819begin
820  Self.Create(AEventLoop, AStream, AStream);
821end;
822
823constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop;
824  ADataStream: TStream; ABlockingStream: THandleStream);
825begin
826  ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream));
827
828  inherited Create;
829  FEventLoop := AEventLoop;
830  FDataStream := ADataStream;
831  FBlockingStream := ABlockingStream;
832end;
833
834destructor TAsyncWriteStream.Destroy;
835begin
836  if Assigned(NotifyHandle) then
837    EventLoop.ClearCanWriteNotify(NotifyHandle);
838  inherited Destroy;
839end;
840
841procedure TAsyncWriteStream.StopAndFree;
842begin
843  if InCallback then
844  begin
845    if Assigned(NotifyHandle) then
846    begin
847      EventLoop.ClearCanWriteNotify(NotifyHandle);
848      NotifyHandle := nil;
849    end;
850    DoStopAndFree := True;
851  end else
852    Self.Free;
853end;
854
855
856end.
857