1 (*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *)
19 unit Thrift.Transport;
20 
21 {$I Thrift.Defines.inc}
22 {$SCOPEDENUMS ON}
23 
24 interface
25 
26 uses
27   Classes,
28   SysUtils,
29   Math,
30   Generics.Collections,
31   {$IFDEF OLD_UNIT_NAMES}
32     WinSock, Sockets,
33   {$ELSE}
34     Winapi.WinSock,
35     {$IFDEF OLD_SOCKETS}
36       Web.Win.Sockets,
37     {$ELSE}
38       Thrift.Socket,
39     {$ENDIF}
40   {$ENDIF}
41   Thrift.Configuration,
42   Thrift.Collections,
43   Thrift.Exception,
44   Thrift.Utils,
45   Thrift.WinHTTP,
46   Thrift.Stream;
47 
48 const
49   DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024; // 100 MB
50   DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
51 
52 type
53   IStreamTransport = interface;
54 
55   ITransport = interface
56     ['{52F81383-F880-492F-8AA7-A66B85B93D6B}']
GetIsOpennull57     function GetIsOpen: Boolean;
58     property IsOpen: Boolean read GetIsOpen;
Peeknull59     function Peek: Boolean;
60     procedure Open;
61     procedure Close;
62 
Readnull63     function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
Readnull64     function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
ReadAllnull65     function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
ReadAllnull66     function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
67     procedure Write( const buf: TBytes); overload;
68     procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
69     procedure Write( const pBuf : Pointer; off, len : Integer); overload;
70     procedure Write( const pBuf : Pointer; len : Integer); overload;
71     procedure Flush;
72 
Configurationnull73     function  Configuration : IThriftConfiguration;
MaxMessageSizenull74     function  MaxMessageSize : Integer;
75     procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
76     procedure CheckReadBytesAvailable( const numBytes : Int64);
77     procedure UpdateKnownMessageSize( const size : Int64);
78   end;
79 
80   TTransportBase = class abstract( TInterfacedObject)
81   strict protected
GetIsOpennull82     function GetIsOpen: Boolean; virtual; abstract;
83     property IsOpen: Boolean read GetIsOpen;
Peeknull84     function Peek: Boolean; virtual;
85     procedure Open(); virtual; abstract;
86     procedure Close(); virtual; abstract;
87 
Readnull88     function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
Readnull89     function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
ReadAllnull90     function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;  overload; inline;
ReadAllnull91     function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
92     procedure Write( const buf: TBytes); overload; inline;
93     procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
94     procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
95     procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
96     procedure Flush; virtual;
97 
Configurationnull98     function  Configuration : IThriftConfiguration; virtual; abstract;
99     procedure UpdateKnownMessageSize( const size : Int64); virtual; abstract;
100   end;
101 
102   // base class for all endpoint transports, e.g. sockets, pipes or HTTP
103   TEndpointTransportBase = class abstract( TTransportBase, ITransport)
104   strict private
105     FRemainingMessageSize : Int64;
106     FKnownMessageSize : Int64;
107     FConfiguration : IThriftConfiguration;
108   strict protected
Configurationnull109     function  Configuration : IThriftConfiguration; override;
MaxMessageSizenull110     function  MaxMessageSize : Integer;
111     property  RemainingMessageSize : Int64 read FRemainingMessageSize;
112     property  KnownMessageSize : Int64 read FKnownMessageSize;
113     procedure ResetConsumedMessageSize( const newSize : Int64 = -1);
114     procedure UpdateKnownMessageSize(const size : Int64); override;
115     procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
116     procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
117   public
118     constructor Create( const aConfig : IThriftConfiguration);  reintroduce;
119   end;
120 
121   // base class for all layered transports, e.g. framed
122   TLayeredTransportBase<T : ITransport> = class abstract( TTransportBase, ITransport)
123   strict private
124     FTransport : T;
125   strict protected
126     property  InnerTransport : T read FTransport;
GetUnderlyingTransportnull127     function  GetUnderlyingTransport: ITransport;
Configurationnull128     function  Configuration : IThriftConfiguration; override;
129     procedure UpdateKnownMessageSize( const size : Int64); override;
MaxMessageSizenull130     function  MaxMessageSize : Integer;  inline;
131     procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);  inline;
132     procedure CheckReadBytesAvailable( const numBytes : Int64);   virtual;
133   public
134     constructor Create( const aTransport: T); reintroduce;
135     property UnderlyingTransport: ITransport read GetUnderlyingTransport;
136   end;
137 
138   TTransportException = class abstract( TException)
139   public
140     type
141       TExceptionType = (
142         Unknown,
143         NotOpen,
144         AlreadyOpen,
145         TimedOut,
146         EndOfFile,
147         BadArgs,
148         Interrupted,
149         CorruptedData
150       );
151   strict protected
152     constructor HiddenCreate(const Msg: string);
GetTypenull153     class function GetType: TExceptionType;  virtual; abstract;
154   public
Createnull155     class function Create( aType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Createnull156     class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Createnull157     class function Create( aType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
158     property Type_: TExceptionType read GetType;
159   end;
160 
161   // Needed to remove deprecation warning
162   TTransportExceptionSpecialized = class abstract (TTransportException)
163   public
164     constructor Create(const Msg: string);
165   end;
166 
167   TTransportExceptionUnknown = class (TTransportExceptionSpecialized)
168   strict protected
GetTypenull169     class function GetType: TTransportException.TExceptionType;  override;
170   end;
171 
172   TTransportExceptionNotOpen = class (TTransportExceptionSpecialized)
173   strict protected
GetTypenull174     class function GetType: TTransportException.TExceptionType;  override;
175   end;
176 
177   TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized)
178   strict protected
GetTypenull179     class function GetType: TTransportException.TExceptionType;  override;
180   end;
181 
182   TTransportExceptionTimedOut = class (TTransportExceptionSpecialized)
183   strict protected
GetTypenull184     class function GetType: TTransportException.TExceptionType;  override;
185   end;
186 
187   TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized)
188   strict protected
GetTypenull189     class function GetType: TTransportException.TExceptionType;  override;
190   end;
191 
192   TTransportExceptionBadArgs = class (TTransportExceptionSpecialized)
193   strict protected
GetTypenull194     class function GetType: TTransportException.TExceptionType;  override;
195   end;
196 
197   TTransportExceptionInterrupted = class (TTransportExceptionSpecialized)
198   strict protected
GetTypenull199     class function GetType: TTransportException.TExceptionType;  override;
200   end;
201 
202   TTransportExceptionCorruptedData = class (TTransportExceptionSpecialized)
203   protected
GetTypenull204     class function GetType: TTransportException.TExceptionType;  override;
205   end;
206 
207   TSecureProtocol = (
208     SSL_2, SSL_3, TLS_1,   // outdated, for compatibilty only
209     TLS_1_1, TLS_1_2       // secure (as of today)
210   );
211 
212   TSecureProtocols = set of TSecureProtocol;
213 
214   IHTTPClient = interface( ITransport )
215     ['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}']
216     procedure SetDnsResolveTimeout(const Value: Integer);
GetDnsResolveTimeoutnull217     function GetDnsResolveTimeout: Integer;
218     procedure SetConnectionTimeout(const Value: Integer);
GetConnectionTimeoutnull219     function GetConnectionTimeout: Integer;
220     procedure SetSendTimeout(const Value: Integer);
GetSendTimeoutnull221     function GetSendTimeout: Integer;
222     procedure SetReadTimeout(const Value: Integer);
GetReadTimeoutnull223     function GetReadTimeout: Integer;
GetCustomHeadersnull224     function GetCustomHeaders: IThriftDictionary<string,string>;
225     procedure SendRequest;
GetSecureProtocolsnull226     function GetSecureProtocols : TSecureProtocols;
227     procedure SetSecureProtocols( const value : TSecureProtocols);
228 
229     property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout;
230     property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
231     property SendTimeout: Integer read GetSendTimeout write SetSendTimeout;
232     property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
233     property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
234     property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols;
235   end;
236 
237   IServerTransport = interface
238     ['{FA01363F-6B40-482F-971E-4A085535EFC8}']
239     procedure Listen;
240     procedure Close;
Acceptnull241     function Accept( const fnAccepting: TProc): ITransport;
Configurationnull242     function Configuration : IThriftConfiguration;
243   end;
244 
245   TServerTransportImpl = class( TInterfacedObject, IServerTransport)
246   strict private
247     FConfig : IThriftConfiguration;
248   strict protected
Configurationnull249     function  Configuration : IThriftConfiguration;
250     procedure Listen; virtual; abstract;
251     procedure Close; virtual; abstract;
Acceptnull252     function  Accept( const fnAccepting: TProc): ITransport;  virtual; abstract;
253   public
254     constructor Create( const aConfig : IThriftConfiguration);
255   end;
256 
257   ITransportFactory = interface
258     ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
GetTransportnull259     function GetTransport( const aTransport: ITransport): ITransport;
260   end;
261 
262   TTransportFactoryImpl = class ( TInterfacedObject, ITransportFactory)
263   strict protected
GetTransportnull264     function GetTransport( const aTransport: ITransport): ITransport; virtual;
265   end;
266 
267 
268   TTcpSocketStreamImpl = class( TThriftStreamImpl)
269 {$IFDEF OLD_SOCKETS}
270   strict private type
271     TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
272   strict private
273     FTcpClient : TCustomIpClient;
274     FTimeout : Integer;
Selectnull275     function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
276                      TimeOut: Integer; var wsaError : Integer): Integer;
WaitForDatanull277     function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
278                           var wsaError, bytesReady : Integer): TWaitForData;
279 {$ELSE}
280     FTcpClient: TSocket;
281   strict protected const
282     SLEEP_TIME = 200;
283 {$ENDIF}
284   strict protected
285     procedure Write( const pBuf : Pointer; offset, count: Integer); override;
Readnull286     function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
287     procedure Open; override;
288     procedure Close; override;
289     procedure Flush; override;
290 
IsOpennull291     function IsOpen: Boolean; override;
ToArraynull292     function ToArray: TBytes; override;
293   public
294 {$IFDEF OLD_SOCKETS}
295     constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = DEFAULT_THRIFT_TIMEOUT);
296 {$ELSE}
297     constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = DEFAULT_THRIFT_TIMEOUT);
298 {$ENDIF}
299   end;
300 
301   IStreamTransport = interface( ITransport )
302     ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
GetInputStreamnull303     function GetInputStream: IThriftStream;
GetOutputStreamnull304     function GetOutputStream: IThriftStream;
305     property InputStream : IThriftStream read GetInputStream;
306     property OutputStream : IThriftStream read GetOutputStream;
307   end;
308 
309   TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
310   strict protected
311     FInputStream : IThriftStream;
312     FOutputStream : IThriftStream;
313   strict protected
GetIsOpennull314     function GetIsOpen: Boolean; override;
315 
GetInputStreamnull316     function GetInputStream: IThriftStream;
GetOutputStreamnull317     function GetOutputStream: IThriftStream;
318 
319   strict protected
320     procedure Open; override;
321     procedure Close; override;
322     procedure Flush; override;
Readnull323     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
324     procedure Write( const pBuf : Pointer; off, len : Integer); override;
325   public
326     constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil);  reintroduce;
327     destructor Destroy; override;
328 
329     property InputStream : IThriftStream read GetInputStream;
330     property OutputStream : IThriftStream read GetOutputStream;
331   end;
332 
333   TBufferedStreamImpl = class( TThriftStreamImpl)
334   strict private
335     FStream : IThriftStream;
336     FBufSize : Integer;
337     FReadBuffer : TMemoryStream;
338     FWriteBuffer : TMemoryStream;
339   strict protected
340     procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
Readnull341     function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
342     procedure Open;  override;
343     procedure Close; override;
344     procedure Flush; override;
IsOpennull345     function IsOpen: Boolean; override;
ToArraynull346     function ToArray: TBytes; override;
Sizenull347     function Size : Int64; override;
Positionnull348     function Position : Int64; override;
349   public
350     constructor Create( const aStream: IThriftStream; const aBufSize : Integer);
351     destructor Destroy; override;
352   end;
353 
354   TServerSocketImpl = class( TServerTransportImpl)
355   strict private
356 {$IFDEF OLD_SOCKETS}
357     FServer : TTcpServer;
358     FPort : Integer;
359     FClientTimeout : Integer;
360 {$ELSE}
361     FServer: TServerSocket;
362 {$ENDIF}
363     FUseBufferedSocket : Boolean;
364     FOwnsServer : Boolean;
365 
366   strict protected
Acceptnull367     function Accept( const fnAccepting: TProc) : ITransport; override;
368 
369   public
370     {$IFDEF OLD_SOCKETS}
371     constructor Create( const aServer: TTcpServer; const aClientTimeout : Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil);  overload;
372     constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil);  overload;
373     {$ELSE}
374     constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil);  overload;
375     constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil);  overload;
376     {$ENDIF}
377 
378     destructor Destroy; override;
379     procedure Listen; override;
380     procedure Close; override;
381   end;
382 
383   TBufferedTransportImpl = class( TLayeredTransportBase<IStreamTransport>)
384   strict private
385     FInputBuffer : IThriftStream;
386     FOutputBuffer : IThriftStream;
387     FBufSize : Integer;
388 
389     procedure InitBuffers;
390   strict protected
GetIsOpennull391     function GetIsOpen: Boolean; override;
392     procedure Flush; override;
393   public
394     type
395       TFactory = class( TTransportFactoryImpl )
396       public
GetTransportnull397         function GetTransport( const aTransport: ITransport): ITransport; override;
398       end;
399 
400     constructor Create( const aTransport : IStreamTransport; const aBufSize: Integer = 1024);
401     procedure Open(); override;
402     procedure Close(); override;
Readnull403     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
404     procedure Write( const pBuf : Pointer; off, len : Integer); override;
405     procedure CheckReadBytesAvailable( const value : Int64); override;
406     property IsOpen: Boolean read GetIsOpen;
407   end;
408 
409   TSocketImpl = class(TStreamTransportImpl)
410   strict private
411 {$IFDEF OLD_SOCKETS}
412     FClient : TCustomIpClient;
413 {$ELSE}
414     FClient: TSocket;
415 {$ENDIF}
416     FOwnsClient : Boolean;
417     FHost : string;
418     FPort : Integer;
419 {$IFDEF OLD_SOCKETS}
420     FTimeout : Integer;
421 {$ELSE}
422     FTimeout : Longword;
423 {$ENDIF}
424 
425     procedure InitSocket;
426   strict protected
GetIsOpennull427     function GetIsOpen: Boolean; override;
428   public
429 {$IFDEF OLD_SOCKETS}
430     constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
431     constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
432 {$ELSE}
433     constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration = nil); overload;
434     constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
435 {$ENDIF}
436     destructor Destroy; override;
437 
438     procedure Open; override;
439     procedure Close; override;
440 {$IFDEF OLD_SOCKETS}
441     property TcpClient: TCustomIpClient read FClient;
442 {$ELSE}
443     property TcpClient: TSocket read FClient;
444 {$ENDIF}
445     property Host : string read FHost;
446     property Port: Integer read FPort;
447   end;
448 
449   TFramedTransportImpl = class( TLayeredTransportBase<ITransport>)
450   strict protected type
451     TFramedHeader = Int32;
452   strict protected
453     FWriteBuffer : TMemoryStream;
454     FReadBuffer : TMemoryStream;
455 
456     procedure InitWriteBuffer;
457     procedure ReadFrame;
458 
459     procedure Open(); override;
GetIsOpennull460     function  GetIsOpen: Boolean; override;
461 
462     procedure Close(); override;
Readnull463     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
464     procedure Write( const pBuf : Pointer; off, len : Integer); override;
465     procedure CheckReadBytesAvailable( const value : Int64);  override;
466     procedure Flush; override;
467 
468   public
469     type
470       TFactory = class( TTransportFactoryImpl )
471       public
GetTransportnull472         function GetTransport( const aTransport: ITransport): ITransport; override;
473       end;
474 
475     constructor Create( const aTransport: ITransport); overload;
476     destructor Destroy; override;
477   end;
478 
479 
480 const
481   DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
482 
483 implementation
484 
485 
486 { TTransportBase }
487 
488 procedure TTransportBase.Flush;
489 begin
490   // nothing to do
491 end;
492 
Peeknull493 function TTransportBase.Peek: Boolean;
494 begin
495   Result := IsOpen;
496 end;
497 
Readnull498 function TTransportBase.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
499 begin
500   if Length(buf) > 0
501   then result := Read( @buf[0], Length(buf), off, len)
502   else result := 0;
503 end;
504 
ReadAllnull505 function TTransportBase.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
506 begin
507   if Length(buf) > 0
508   then result := ReadAll( @buf[0], Length(buf), off, len)
509   else result := 0;
510 end;
511 
ReadAllnull512 function TTransportBase.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
513 var ret : Integer;
514 begin
515   result := 0;
516   while result < len do begin
517     ret := Read( pBuf, buflen, off + result, len - result);
518     if ret > 0
519     then Inc( result, ret)
520     else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
521   end;
522 end;
523 
524 procedure TTransportBase.Write( const buf: TBytes);
525 begin
526   if Length(buf) > 0
527   then Write( @buf[0], 0, Length(buf));
528 end;
529 
530 procedure TTransportBase.Write( const buf: TBytes; off: Integer; len: Integer);
531 begin
532   if Length(buf) > 0
533   then Write( @buf[0], off, len);
534 end;
535 
536 procedure TTransportBase.Write( const pBuf : Pointer; len : Integer);
537 begin
538   Self.Write( pBuf, 0, len);
539 end;
540 
541 
542 { TEndpointTransportBase }
543 
544 constructor TEndpointTransportBase.Create( const aConfig : IThriftConfiguration);
545 begin
546   inherited Create;
547 
548   if aConfig <> nil
549   then FConfiguration := aConfig
550   else FConfiguration := TThriftConfigurationImpl.Create;
551 
552   ResetConsumedMessageSize;
553 end;
554 
555 
Configurationnull556 function TEndpointTransportBase.Configuration : IThriftConfiguration;
557 begin
558   result := FConfiguration;
559 end;
560 
561 
MaxMessageSizenull562 function TEndpointTransportBase.MaxMessageSize : Integer;
563 begin
564   ASSERT( Configuration <> nil);
565   result := Configuration.MaxMessageSize;
566 end;
567 
568 
569 procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
570 // Resets RemainingMessageSize to the configured maximum
571 begin
572   // full reset
573   if newSize < 0 then begin
574     FKnownMessageSize := MaxMessageSize;
575     FRemainingMessageSize := MaxMessageSize;
576     Exit;
577   end;
578 
579   // update only: message size can shrink, but not grow
580   ASSERT( KnownMessageSize <= MaxMessageSize);
581   if newSize > KnownMessageSize
582   then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
583 
584   FKnownMessageSize := newSize;
585   FRemainingMessageSize := newSize;
586 end;
587 
588 
589 procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
590 // Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
591 // Will throw if we already consumed too many bytes.
592 var consumed : Int64;
593 begin
594   consumed := KnownMessageSize - RemainingMessageSize;
595   ResetConsumedMessageSize(size);
596   CountConsumedMessageBytes(consumed);
597 end;
598 
599 
600 procedure TEndpointTransportBase.CheckReadBytesAvailable( const numBytes : Int64);
601 // Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
602 begin
603   if RemainingMessageSize < numBytes
604   then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
605 end;
606 
607 
608 procedure TEndpointTransportBase.CountConsumedMessageBytes( const numBytes : Int64);
609 // Consumes numBytes from the RemainingMessageSize.
610 begin
611   if (RemainingMessageSize >= numBytes)
612   then Dec( FRemainingMessageSize, numBytes)
613   else begin
614     FRemainingMessageSize := 0;
615     raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
616   end;
617 end;
618 
619 { TLayeredTransportBase }
620 
621 constructor TLayeredTransportBase<T>.Create( const aTransport: T);
622 begin
623   inherited Create;
624   FTransport := aTransport;
625 end;
626 
TLayeredTransportBasenull627 function TLayeredTransportBase<T>.GetUnderlyingTransport: ITransport;
628 begin
629   result := InnerTransport;
630 end;
631 
TLayeredTransportBasenull632 function TLayeredTransportBase<T>.Configuration : IThriftConfiguration;
633 begin
634   result := InnerTransport.Configuration;
635 end;
636 
637 procedure TLayeredTransportBase<T>.UpdateKnownMessageSize( const size : Int64);
638 begin
639   InnerTransport.UpdateKnownMessageSize( size);
640 end;
641 
642 
TLayeredTransportBasenull643 function TLayeredTransportBase<T>.MaxMessageSize : Integer;
644 begin
645   result := InnerTransport.MaxMessageSize;
646 end;
647 
648 
649 procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
650 begin
651   InnerTransport.ResetConsumedMessageSize( knownSize);
652 end;
653 
654 
655 procedure TLayeredTransportBase<T>.CheckReadBytesAvailable( const numBytes : Int64);
656 begin
657   InnerTransport.CheckReadBytesAvailable( numBytes);
658 end;
659 
660 
661 
662 { TTransportException }
663 
664 constructor TTransportException.HiddenCreate(const Msg: string);
665 begin
666   inherited Create(Msg);
667 end;
668 
TTransportException.Createnull669 class function TTransportException.Create(aType: TExceptionType): TTransportException;
670 begin
671   //no inherited;
672 {$WARN SYMBOL_DEPRECATED OFF}
673   Result := Create(aType, '')
674 {$WARN SYMBOL_DEPRECATED DEFAULT}
675 end;
676 
TTransportException.Createnull677 class function TTransportException.Create(aType: TExceptionType; const msg: string): TTransportException;
678 begin
679   case aType of
680     TExceptionType.NotOpen:     Result := TTransportExceptionNotOpen.Create(msg);
681     TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
682     TExceptionType.TimedOut:    Result := TTransportExceptionTimedOut.Create(msg);
683     TExceptionType.EndOfFile:   Result := TTransportExceptionEndOfFile.Create(msg);
684     TExceptionType.BadArgs:     Result := TTransportExceptionBadArgs.Create(msg);
685     TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
686   else
687     ASSERT( TExceptionType.Unknown = aType);
688     Result := TTransportExceptionUnknown.Create(msg);
689   end;
690 end;
691 
TTransportException.Createnull692 class function TTransportException.Create(const msg: string): TTransportException;
693 begin
694   Result := TTransportExceptionUnknown.Create(Msg);
695 end;
696 
697 { TTransportExceptionSpecialized }
698 
699 constructor TTransportExceptionSpecialized.Create(const Msg: string);
700 begin
701   inherited HiddenCreate(Msg);
702 end;
703 
704 { specialized TTransportExceptions }
705 
TTransportExceptionUnknown.GetTypenull706 class function TTransportExceptionUnknown.GetType: TTransportException.TExceptionType;
707 begin
708   result := TExceptionType.Unknown;
709 end;
710 
TTransportExceptionNotOpen.GetTypenull711 class function TTransportExceptionNotOpen.GetType: TTransportException.TExceptionType;
712 begin
713   result := TExceptionType.NotOpen;
714 end;
715 
TTransportExceptionAlreadyOpen.GetTypenull716 class function TTransportExceptionAlreadyOpen.GetType: TTransportException.TExceptionType;
717 begin
718   result := TExceptionType.AlreadyOpen;
719 end;
720 
TTransportExceptionTimedOut.GetTypenull721 class function TTransportExceptionTimedOut.GetType: TTransportException.TExceptionType;
722 begin
723   result := TExceptionType.TimedOut;
724 end;
725 
TTransportExceptionEndOfFile.GetTypenull726 class function TTransportExceptionEndOfFile.GetType: TTransportException.TExceptionType;
727 begin
728   result := TExceptionType.EndOfFile;
729 end;
730 
TTransportExceptionBadArgs.GetTypenull731 class function TTransportExceptionBadArgs.GetType: TTransportException.TExceptionType;
732 begin
733   result := TExceptionType.BadArgs;
734 end;
735 
TTransportExceptionInterrupted.GetTypenull736 class function TTransportExceptionInterrupted.GetType: TTransportException.TExceptionType;
737 begin
738   result := TExceptionType.Interrupted;
739 end;
740 
TTransportExceptionCorruptedData.GetTypenull741 class function TTransportExceptionCorruptedData.GetType: TTransportException.TExceptionType;
742 begin
743   result := TExceptionType.CorruptedData;
744 end;
745 
746 { TTransportFactoryImpl }
747 
TTransportFactoryImpl.GetTransportnull748 function TTransportFactoryImpl.GetTransport( const aTransport: ITransport): ITransport;
749 begin
750   Result := aTransport;
751 end;
752 
753 
754 { TServerTransportImpl }
755 
756 constructor TServerTransportImpl.Create( const aConfig : IThriftConfiguration);
757 begin
758   inherited Create;
759   if aConfig <> nil
760   then FConfig := aConfig
761   else FConfig := TThriftConfigurationImpl.Create;
762 end;
763 
TServerTransportImpl.Configurationnull764 function TServerTransportImpl.Configuration : IThriftConfiguration;
765 begin
766   result := FConfig;
767 end;
768 
769 { TServerSocket }
770 
771 {$IFDEF OLD_SOCKETS}
772 constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aConfig : IThriftConfiguration);
773 {$ELSE}
774 constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aConfig : IThriftConfiguration);
775 {$ENDIF}
776 begin
777   inherited Create( aConfig);
778   FServer := aServer;
779 
780 
781 {$IFDEF OLD_SOCKETS}
782   FClientTimeout := aClientTimeout;
783 {$ELSE}
784   FServer.RecvTimeout := aClientTimeout;
785   FServer.SendTimeout := aClientTimeout;
786 {$ENDIF}
787 end;
788 
789 
790 {$IFDEF OLD_SOCKETS}
791 constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
792 {$ELSE}
793 constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
794 {$ENDIF}
795 begin
796   inherited Create( aConfig);
797 
798 {$IFDEF OLD_SOCKETS}
799   FPort := aPort;
800   FClientTimeout := aClientTimeout;
801 
802   FOwnsServer := True;
803   FServer := TTcpServer.Create( nil );
804   FServer.BlockMode := bmBlocking;
805   {$IF CompilerVersion >= 21.0}
806   FServer.LocalPort := AnsiString( IntToStr( FPort));
807   {$ELSE}
808   FServer.LocalPort := IntToStr( FPort);
809   {$IFEND}
810 {$ELSE}
811   FOwnsServer := True;
812   FServer := TServerSocket.Create(aPort, aClientTimeout, aClientTimeout);
813 {$ENDIF}
814 
815   FUseBufferedSocket := aUseBufferedSockets;
816 end;
817 
818 destructor TServerSocketImpl.Destroy;
819 begin
820   if FOwnsServer then begin
821     FServer.Free;
822     FServer := nil;
823   end;
824   inherited;
825 end;
826 
Acceptnull827 function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
828 var
829 {$IFDEF OLD_SOCKETS}
830   client : TCustomIpClient;
831 {$ELSE}
832   client: TSocket;
833 {$ENDIF}
834   trans  : IStreamTransport;
835 begin
836   if FServer = nil then begin
837     raise TTransportExceptionNotOpen.Create('No underlying server socket.');
838   end;
839 
840 {$IFDEF OLD_SOCKETS}
841   client := nil;
842   try
843     client := TCustomIpClient.Create(nil);
844 
845     if Assigned(fnAccepting)
846     then fnAccepting();
847 
848     if not FServer.Accept( client) then begin
849       client.Free;
850       Result := nil;
851       Exit;
852     end;
853 
854     if client = nil then begin
855       Result := nil;
856       Exit;
857     end;
858 
859     trans := TSocketImpl.Create( client, TRUE, FClientTimeout, Configuration);
860     client := nil;  // trans owns it now
861 
862     if FUseBufferedSocket
863     then result := TBufferedTransportImpl.Create( trans)
864     else result := trans;
865 
866   except
867     on E: Exception do begin
868       client.Free;
869       raise TTransportExceptionUnknown.Create(E.ToString);
870     end;
871   end;
872 {$ELSE}
873   if Assigned(fnAccepting) then
874     fnAccepting();
875 
876   client := FServer.Accept;
877   try
878     trans := TSocketImpl.Create(client, TRUE, Configuration);
879     client := nil;
880 
881     if FUseBufferedSocket then
882       Result := TBufferedTransportImpl.Create(trans)
883     else
884       Result := trans;
885   except
886     client.Free;
887     raise;
888   end;
889 {$ENDIF}
890 end;
891 
892 procedure TServerSocketImpl.Listen;
893 begin
894   if FServer <> nil then
895   begin
896 {$IFDEF OLD_SOCKETS}
897     try
898       FServer.Active := True;
899     except
900       on E: Exception
901       do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
902     end;
903 {$ELSE}
904     FServer.Listen;
905 {$ENDIF}
906   end;
907 end;
908 
909 procedure TServerSocketImpl.Close;
910 begin
911   if FServer <> nil then
912 {$IFDEF OLD_SOCKETS}
913     try
914       FServer.Active := False;
915     except
916       on E: Exception
917       do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
918     end;
919 {$ELSE}
920     FServer.Close;
921 {$ENDIF}
922 end;
923 
924 { TSocket }
925 
926 {$IFDEF OLD_SOCKETS}
927 constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aConfig : IThriftConfiguration);
928 {$ELSE}
929 constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration);
930 {$ENDIF}
931 var stream : IThriftStream;
932 begin
933   FClient := aClient;
934   FOwnsClient := aOwnsClient;
935 
936 {$IFDEF OLD_SOCKETS}
937   FTimeout := aTimeout;
938 {$ELSE}
939   FTimeout := aClient.RecvTimeout;
940 {$ENDIF}
941 
942   stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
943   inherited Create( stream, stream, aConfig);
944 end;
945 
946 
947 {$IFDEF OLD_SOCKETS}
948 constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aConfig : IThriftConfiguration);
949 {$ELSE}
950 constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aConfig : IThriftConfiguration);
951 {$ENDIF}
952 begin
953   inherited Create(nil,nil, aConfig);
954   FHost := aHost;
955   FPort := aPort;
956   FTimeout := aTimeout;
957   InitSocket;
958 end;
959 
960 destructor TSocketImpl.Destroy;
961 begin
962   if FOwnsClient
963   then FreeAndNil( FClient);
964   inherited;
965 end;
966 
967 procedure TSocketImpl.Close;
968 begin
969   inherited Close;
970 
971   FInputStream := nil;
972   FOutputStream := nil;
973 
974   if FOwnsClient
975   then FreeAndNil( FClient)
976   else FClient := nil;
977 end;
978 
TSocketImpl.GetIsOpennull979 function TSocketImpl.GetIsOpen: Boolean;
980 begin
981 {$IFDEF OLD_SOCKETS}
982   Result := (FClient <> nil) and FClient.Connected;
983 {$ELSE}
984   Result := (FClient <> nil) and FClient.IsOpen
985 {$ENDIF}
986 end;
987 
988 procedure TSocketImpl.InitSocket;
989 var
990   stream : IThriftStream;
991 begin
992   if FOwnsClient
993   then FreeAndNil( FClient)
994   else FClient := nil;
995 
996 {$IFDEF OLD_SOCKETS}
997   FClient := TTcpClient.Create( nil);
998 {$ELSE}
999   FClient := TSocket.Create(FHost, FPort);
1000 {$ENDIF}
1001   FOwnsClient := True;
1002 
1003   stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
1004   FInputStream := stream;
1005   FOutputStream := stream;
1006 end;
1007 
1008 procedure TSocketImpl.Open;
1009 begin
1010   if IsOpen then begin
1011     raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
1012   end;
1013 
1014   if FHost = '' then begin
1015     raise TTransportExceptionNotOpen.Create('Cannot open null host');
1016   end;
1017 
1018   if Port <= 0 then begin
1019     raise TTransportExceptionNotOpen.Create('Cannot open without port');
1020   end;
1021 
1022   if FClient = nil
1023   then InitSocket;
1024 
1025 {$IFDEF OLD_SOCKETS}
1026   FClient.RemoteHost := TSocketHost( Host);
1027   FClient.RemotePort := TSocketPort( IntToStr( Port));
1028   FClient.Connect;
1029 {$ELSE}
1030   FClient.Open;
1031 {$ENDIF}
1032 
1033   FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
1034   FOutputStream := FInputStream;
1035 end;
1036 
1037 { TBufferedStream }
1038 
1039 procedure TBufferedStreamImpl.Close;
1040 begin
1041   Flush;
1042   FStream := nil;
1043 
1044   FReadBuffer.Free;
1045   FReadBuffer := nil;
1046 
1047   FWriteBuffer.Free;
1048   FWriteBuffer := nil;
1049 end;
1050 
1051 constructor TBufferedStreamImpl.Create( const aStream: IThriftStream; const aBufSize : Integer);
1052 begin
1053   inherited Create;
1054   FStream := aStream;
1055   FBufSize := aBufSize;
1056   FReadBuffer := TMemoryStream.Create;
1057   FWriteBuffer := TMemoryStream.Create;
1058 end;
1059 
1060 destructor TBufferedStreamImpl.Destroy;
1061 begin
1062   Close;
1063   inherited;
1064 end;
1065 
1066 procedure TBufferedStreamImpl.Flush;
1067 var
1068   buf : TBytes;
1069   len : Integer;
1070 begin
1071   if IsOpen then begin
1072     len := FWriteBuffer.Size;
1073     if len > 0 then begin
1074       SetLength( buf, len );
1075       FWriteBuffer.Position := 0;
1076       FWriteBuffer.Read( Pointer(@buf[0])^, len );
1077       FStream.Write( buf, 0, len );
1078     end;
1079     FWriteBuffer.Clear;
1080   end;
1081 end;
1082 
TBufferedStreamImpl.IsOpennull1083 function TBufferedStreamImpl.IsOpen: Boolean;
1084 begin
1085   Result := (FWriteBuffer <> nil)
1086         and (FReadBuffer <> nil)
1087         and (FStream <> nil)
1088         and FStream.IsOpen;
1089 end;
1090 
1091 procedure TBufferedStreamImpl.Open;
1092 begin
1093   FStream.Open;
1094 end;
1095 
Readnull1096 function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
1097 var
1098   nRead : Integer;
1099   tempbuf : TBytes;
1100   pTmp : PByte;
1101 begin
1102   inherited;
1103   Result := 0;
1104 
1105   if IsOpen then begin
1106     while count > 0 do begin
1107 
1108       if FReadBuffer.Position >= FReadBuffer.Size then begin
1109         FReadBuffer.Clear;
1110         SetLength( tempbuf, FBufSize);
1111         nRead := FStream.Read( tempbuf, 0, FBufSize );
1112         if nRead = 0 then Break; // avoid infinite loop
1113 
1114         FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
1115         FReadBuffer.Position := 0;
1116       end;
1117 
1118       if FReadBuffer.Position < FReadBuffer.Size then begin
1119         nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
1120         pTmp  := pBuf;
1121         Inc( pTmp, offset);
1122         Inc( Result, FReadBuffer.Read( pTmp^, nRead));
1123         Dec( count, nRead);
1124         Inc( offset, nRead);
1125       end;
1126     end;
1127   end;
1128 end;
1129 
1130 
ToArraynull1131 function TBufferedStreamImpl.ToArray: TBytes;
1132 var len : Integer;
1133 begin
1134   if IsOpen
1135   then len := FReadBuffer.Size
1136   else len := 0;
1137 
1138   SetLength( Result, len);
1139 
1140   if len > 0 then begin
1141     FReadBuffer.Position := 0;
1142     FReadBuffer.Read( Pointer(@Result[0])^, len );
1143   end;
1144 end;
1145 
1146 procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
1147 var pTmp : PByte;
1148 begin
1149   inherited;
1150   if count > 0 then begin
1151     if IsOpen then begin
1152       pTmp := pBuf;
1153       Inc( pTmp, offset);
1154       FWriteBuffer.Write( pTmp^, count );
1155       if FWriteBuffer.Size > FBufSize then begin
1156         Flush;
1157       end;
1158     end;
1159   end;
1160 end;
1161 
1162 
TBufferedStreamImpl.Sizenull1163 function TBufferedStreamImpl.Size : Int64;
1164 begin
1165   result := FReadBuffer.Size;
1166 end;
1167 
1168 
Positionnull1169 function TBufferedStreamImpl.Position : Int64;
1170 begin
1171   result := FReadBuffer.Position;
1172 end;
1173 
1174 
1175 { TStreamTransportImpl }
1176 
1177 constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
1178 begin
1179   inherited Create( aConfig);
1180   FInputStream := aInputStream;
1181   FOutputStream := aOutputStream;
1182 end;
1183 
1184 destructor TStreamTransportImpl.Destroy;
1185 begin
1186   FInputStream := nil;
1187   FOutputStream := nil;
1188   inherited;
1189 end;
1190 
1191 procedure TStreamTransportImpl.Close;
1192 begin
1193   FInputStream := nil;
1194   FOutputStream := nil;
1195 end;
1196 
1197 procedure TStreamTransportImpl.Flush;
1198 begin
1199   if FOutputStream = nil then begin
1200     raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
1201   end;
1202 
1203   FOutputStream.Flush;
1204 end;
1205 
GetInputStreamnull1206 function TStreamTransportImpl.GetInputStream: IThriftStream;
1207 begin
1208   Result := FInputStream;
1209 end;
1210 
GetIsOpennull1211 function TStreamTransportImpl.GetIsOpen: Boolean;
1212 begin
1213   Result := True;
1214 end;
1215 
GetOutputStreamnull1216 function TStreamTransportImpl.GetOutputStream: IThriftStream;
1217 begin
1218   Result := FOutputStream;
1219 end;
1220 
1221 procedure TStreamTransportImpl.Open;
1222 begin
1223   // nothing to do
1224 end;
1225 
Readnull1226 function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
1227 begin
1228   if FInputStream = nil
1229   then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
1230 
1231   Result := FInputStream.Read( pBuf,buflen, off, len );
1232   CountConsumedMessageBytes( result);
1233 end;
1234 
1235 procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
1236 begin
1237   if FOutputStream = nil
1238   then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
1239 
1240   FOutputStream.Write( pBuf, off, len );
1241 end;
1242 
1243 { TBufferedTransportImpl }
1244 
1245 constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
1246 begin
1247   ASSERT( aTransport <> nil);
1248   inherited Create( aTransport);
1249   FBufSize := aBufSize;
1250   InitBuffers;
1251 end;
1252 
1253 procedure TBufferedTransportImpl.Close;
1254 begin
1255   InnerTransport.Close;
1256   FInputBuffer := nil;
1257   FOutputBuffer := nil;
1258 end;
1259 
1260 procedure TBufferedTransportImpl.Flush;
1261 begin
1262   if FOutputBuffer <> nil then begin
1263     FOutputBuffer.Flush;
1264   end;
1265 end;
1266 
TBufferedTransportImpl.GetIsOpennull1267 function TBufferedTransportImpl.GetIsOpen: Boolean;
1268 begin
1269   Result := InnerTransport.IsOpen;
1270 end;
1271 
1272 procedure TBufferedTransportImpl.InitBuffers;
1273 begin
1274   if InnerTransport.InputStream <> nil then begin
1275     FInputBuffer := TBufferedStreamImpl.Create( InnerTransport.InputStream, FBufSize );
1276   end;
1277   if InnerTransport.OutputStream <> nil then begin
1278     FOutputBuffer := TBufferedStreamImpl.Create( InnerTransport.OutputStream, FBufSize );
1279   end;
1280 end;
1281 
1282 procedure TBufferedTransportImpl.Open;
1283 begin
1284   InnerTransport.Open;
1285   InitBuffers;  // we need to get the buffers to match FTransport substreams again
1286 end;
1287 
Readnull1288 function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
1289 begin
1290   if FInputBuffer <> nil
1291   then Result := FInputBuffer.Read( pBuf,buflen, off, len )
1292   else Result := 0;
1293 end;
1294 
1295 procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
1296 begin
1297   if FOutputBuffer <> nil then begin
1298     FOutputBuffer.Write( pBuf, off, len );
1299   end;
1300 end;
1301 
1302 procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Int64);
1303 var buffered, need : Int64;
1304 begin
1305   need := value;
1306 
1307   // buffered bytes
1308   buffered := FInputBuffer.Size - FInputBuffer.Position;
1309   if buffered < need
1310   then InnerTransport.CheckReadBytesAvailable( need - buffered);
1311 end;
1312 
1313 
1314 { TBufferedTransportImpl.TFactory }
1315 
TFactorynull1316 function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
1317 begin
1318   Result := TFramedTransportImpl.Create( aTransport);
1319 end;
1320 
1321 
1322 { TFramedTransportImpl }
1323 
1324 constructor TFramedTransportImpl.Create( const aTransport: ITransport);
1325 begin
1326   ASSERT( aTransport <> nil);
1327   inherited Create( aTransport);
1328 
1329   InitWriteBuffer;
1330 end;
1331 
1332 destructor TFramedTransportImpl.Destroy;
1333 begin
1334   FWriteBuffer.Free;
1335   FWriteBuffer := nil;
1336   FReadBuffer.Free;
1337   FReadBuffer := nil;
1338   inherited;
1339 end;
1340 
1341 procedure TFramedTransportImpl.Close;
1342 begin
1343   InnerTransport.Close;
1344 end;
1345 
1346 procedure TFramedTransportImpl.Flush;
1347 var
1348   buf : TBytes;
1349   len : Integer;
1350   data_len : Int64;
1351 begin
1352   if not IsOpen
1353   then raise TTransportExceptionNotOpen.Create('not open');
1354 
1355   len := FWriteBuffer.Size;
1356   SetLength( buf, len);
1357   if len > 0 then begin
1358     System.Move( FWriteBuffer.Memory^, buf[0], len );
1359   end;
1360 
1361   data_len := len - SizeOf(TFramedHeader);
1362   if (0 > data_len) or (data_len > Configuration.MaxFrameSize)
1363   then raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: invalid frame size ('+IntToStr(data_len)+')')
1364   else UpdateKnownMessageSize( len);
1365 
1366   InitWriteBuffer;
1367 
1368   buf[0] := Byte($FF and (data_len shr 24));
1369   buf[1] := Byte($FF and (data_len shr 16));
1370   buf[2] := Byte($FF and (data_len shr 8));
1371   buf[3] := Byte($FF and data_len);
1372 
1373   InnerTransport.Write( buf, 0, len );
1374   InnerTransport.Flush;
1375 end;
1376 
GetIsOpennull1377 function TFramedTransportImpl.GetIsOpen: Boolean;
1378 begin
1379   Result := InnerTransport.IsOpen;
1380 end;
1381 
1382 type
1383   TAccessMemoryStream = class(TMemoryStream)
1384   end;
1385 
1386 procedure TFramedTransportImpl.InitWriteBuffer;
1387 const DUMMY_HEADER : TFramedHeader = 0;
1388 begin
1389   FreeAndNil( FWriteBuffer);
1390   FWriteBuffer := TMemoryStream.Create;
1391   TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1392   FWriteBuffer.Write( DUMMY_HEADER, SizeOf(DUMMY_HEADER));
1393 end;
1394 
1395 procedure TFramedTransportImpl.Open;
1396 begin
1397   InnerTransport.Open;
1398 end;
1399 
Readnull1400 function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
1401 var pTmp : PByte;
1402 begin
1403   if len > (buflen-off)
1404   then len := buflen-off;
1405 
1406   pTmp := pBuf;
1407   Inc( pTmp, off);
1408 
1409   if (FReadBuffer <> nil) and (len > 0) then begin
1410     result := FReadBuffer.Read( pTmp^, len);
1411     if result > 0 then Exit;
1412   end;
1413 
1414   ReadFrame;
1415   if len > 0
1416   then Result := FReadBuffer.Read( pTmp^, len)
1417   else Result := 0;
1418 end;
1419 
1420 procedure TFramedTransportImpl.ReadFrame;
1421 var
1422   i32rd : packed array[0..SizeOf(TFramedHeader)-1] of Byte;
1423   size : Integer;
1424   buff : TBytes;
1425 begin
1426   InnerTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
1427   size :=
1428     ((i32rd[0] and $FF) shl 24) or
1429     ((i32rd[1] and $FF) shl 16) or
1430     ((i32rd[2] and $FF) shl 8) or
1431      (i32rd[3] and $FF);
1432 
1433   if size < 0 then begin
1434     Close();
1435     raise TTransportExceptionCorruptedData.Create('Read a negative frame size ('+IntToStr(size)+')');
1436   end;
1437 
1438   if Int64(size) > Int64(Configuration.MaxFrameSize) then begin
1439     Close();
1440     raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(Configuration.MaxFrameSize)+')');
1441   end;
1442 
1443   UpdateKnownMessageSize(size + SizeOf(size));
1444 
1445   SetLength( buff, size );
1446   InnerTransport.ReadAll( buff, 0, size );
1447 
1448   FreeAndNil( FReadBuffer);
1449   FReadBuffer := TMemoryStream.Create;
1450   if Length(buff) > 0
1451   then FReadBuffer.Write( Pointer(@buff[0])^, size );
1452   FReadBuffer.Position := 0;
1453 end;
1454 
1455 procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
1456 var pTmp : PByte;
1457 begin
1458   if len > 0 then begin
1459     pTmp := pBuf;
1460     Inc( pTmp, off);
1461 
1462     FWriteBuffer.Write( pTmp^, len );
1463   end;
1464 end;
1465 
1466 
1467 procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Int64);
1468 var buffered, need : Int64;
1469 begin
1470   need := value;
1471 
1472   // buffered bytes
1473   buffered := FReadBuffer.Size - FReadBuffer.Position;
1474   if buffered < need
1475   then InnerTransport.CheckReadBytesAvailable( need - buffered);
1476 end;
1477 
1478 
1479 { TFramedTransport.TFactory }
1480 
TFactorynull1481 function TFramedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
1482 begin
1483   Result := TFramedTransportImpl.Create( aTransport);
1484 end;
1485 
1486 { TTcpSocketStreamImpl }
1487 
1488 procedure TTcpSocketStreamImpl.Close;
1489 begin
1490   FTcpClient.Close;
1491 end;
1492 
1493 {$IFDEF OLD_SOCKETS}
1494 constructor TTcpSocketStreamImpl.Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer);
1495 begin
1496   inherited Create;
1497   FTcpClient := aTcpClient;
1498   FTimeout := aTimeout;
1499 end;
1500 {$ELSE}
1501 constructor TTcpSocketStreamImpl.Create( const aTcpClient: TSocket; const aTimeout : Longword);
1502 begin
1503   inherited Create;
1504   FTcpClient := aTcpClient;
1505   if aTimeout = 0 then
1506     FTcpClient.RecvTimeout := SLEEP_TIME
1507   else
1508     FTcpClient.RecvTimeout := aTimeout;
1509   FTcpClient.SendTimeout := aTimeout;
1510 end;
1511 {$ENDIF}
1512 
1513 procedure TTcpSocketStreamImpl.Flush;
1514 begin
1515   // nothing to do
1516 end;
1517 
1518 
TTcpSocketStreamImpl.IsOpennull1519 function TTcpSocketStreamImpl.IsOpen: Boolean;
1520 begin
1521 {$IFDEF OLD_SOCKETS}
1522   Result := FTcpClient.Active;
1523 {$ELSE}
1524   Result := FTcpClient.IsOpen;
1525 {$ENDIF}
1526 end;
1527 
1528 procedure TTcpSocketStreamImpl.Open;
1529 begin
1530   FTcpClient.Open;
1531 end;
1532 
1533 
1534 {$IFDEF OLD_SOCKETS}
TTcpSocketStreamImpl.Selectnull1535 function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1536                                       TimeOut: Integer; var wsaError : Integer): Integer;
1537 var
1538   ReadFds: TFDset;
1539   ReadFdsptr: PFDset;
1540   WriteFds: TFDset;
1541   WriteFdsptr: PFDset;
1542   ExceptFds: TFDset;
1543   ExceptFdsptr: PFDset;
1544   tv: timeval;
1545   Timeptr: PTimeval;
1546   socket : TSocket;
1547 begin
1548   if not FTcpClient.Active then begin
1549     wsaError := WSAEINVAL;
1550     Exit( SOCKET_ERROR);
1551   end;
1552 
1553   socket := FTcpClient.Handle;
1554 
1555   if Assigned(ReadReady) then begin
1556     ReadFdsptr := @ReadFds;
1557     FD_ZERO(ReadFds);
1558     FD_SET(socket, ReadFds);
1559   end
1560   else begin
1561     ReadFdsptr := nil;
1562   end;
1563 
1564   if Assigned(WriteReady) then begin
1565     WriteFdsptr := @WriteFds;
1566     FD_ZERO(WriteFds);
1567     FD_SET(socket, WriteFds);
1568   end
1569   else begin
1570     WriteFdsptr := nil;
1571   end;
1572 
1573   if Assigned(ExceptFlag) then begin
1574     ExceptFdsptr := @ExceptFds;
1575     FD_ZERO(ExceptFds);
1576     FD_SET(socket, ExceptFds);
1577   end
1578   else begin
1579     ExceptFdsptr := nil;
1580   end;
1581 
1582   if TimeOut >= 0 then begin
1583     tv.tv_sec := TimeOut div 1000;
1584     tv.tv_usec :=  1000 * (TimeOut mod 1000);
1585     Timeptr := @tv;
1586   end
1587   else begin
1588     Timeptr := nil;  // wait forever
1589   end;
1590 
1591   wsaError := 0;
1592   try
1593     {$IFDEF MSWINDOWS}
1594       {$IFDEF OLD_UNIT_NAMES}
1595       result := WinSock.select(        socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1596       {$ELSE}
1597       result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1598       {$ENDIF}
1599     {$ENDIF}
1600     {$IFDEF LINUX}
1601       result := Libc.select(           socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1602     {$ENDIF}
1603 
1604     if result = SOCKET_ERROR
1605     then wsaError := WSAGetLastError;
1606 
1607   except
1608     result := SOCKET_ERROR;
1609   end;
1610 
1611   if Assigned(ReadReady) then
1612    ReadReady^ := FD_ISSET(socket, ReadFds);
1613 
1614   if Assigned(WriteReady) then
1615     WriteReady^ := FD_ISSET(socket, WriteFds);
1616 
1617   if Assigned(ExceptFlag) then
1618     ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1619 end;
1620 {$ENDIF}
1621 
1622 {$IFDEF OLD_SOCKETS}
WaitForDatanull1623 function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1624                                            DesiredBytes : Integer;
1625                                            var wsaError, bytesReady : Integer): TWaitForData;
1626 var bCanRead, bError : Boolean;
1627     retval : Integer;
1628 const
1629   MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK  {$ELSE} Winapi.WinSock.MSG_PEEK  {$ENDIF};
1630 begin
1631   bytesReady := 0;
1632 
returnsnull1633   // The select function returns the total number of socket handles that are ready
1634   // and contained in the fd_set structures, zero if the time limit expired,
1635   // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1636   // WSAGetLastError can be used to retrieve a specific error code.
1637   retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1638   if retval = SOCKET_ERROR
1639   then Exit( TWaitForData.wfd_Error);
1640   if (retval = 0) or not bCanRead
1641   then Exit( TWaitForData.wfd_Timeout);
1642 
1643   // recv() returns the number of bytes received, or -1 if an error occurred.
1644   // The return value will be 0 when the peer has performed an orderly shutdown.
1645 
1646   retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
1647   if retval <= 0
1648   then Exit( TWaitForData.wfd_Error);
1649 
1650   // at least we have some data
1651   bytesReady := Min( retval, DesiredBytes);
1652   result := TWaitForData.wfd_HaveData;
1653 end;
1654 {$ENDIF}
1655 
1656 {$IFDEF OLD_SOCKETS}
Readnull1657 function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
1658 // old sockets version
1659 var wfd : TWaitForData;
1660     wsaError,
1661     msecs : Integer;
1662     nBytes : Integer;
1663     pTmp : PByte;
1664 begin
1665   inherited;
1666 
1667   if FTimeout > 0
1668   then msecs := FTimeout
1669   else msecs := DEFAULT_THRIFT_TIMEOUT;
1670 
1671   result := 0;
1672   pTmp   := pBuf;
1673   Inc( pTmp, offset);
1674   while (count > 0) and (result = 0) do begin
1675 
1676     while TRUE do begin
1677       wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
1678       case wfd of
1679         TWaitForData.wfd_Error    :  Exit;
1680         TWaitForData.wfd_HaveData :  Break;
1681         TWaitForData.wfd_Timeout  :  begin
1682           if (FTimeout = 0)
1683           then Exit
1684           else raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
1685         end;
1686       else
1687         ASSERT( FALSE);
1688       end;
1689     end;
1690 
1691     // reduce the timeout once we got data
1692     if FTimeout > 0
1693     then msecs := FTimeout div 10
1694     else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1695     msecs := Max( msecs, 200);
1696 
1697     ASSERT( nBytes <= count);
1698     nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1699     Inc( pTmp, nBytes);
1700     Dec( count, nBytes);
1701     Inc( result, nBytes);
1702   end;
1703 end;
1704 
ToArraynull1705 function TTcpSocketStreamImpl.ToArray: TBytes;
1706 // old sockets version
1707 var len : Integer;
1708 begin
1709   len := 0;
1710   if IsOpen then begin
1711     len := FTcpClient.BytesReceived;
1712   end;
1713 
1714   SetLength( Result, len );
1715 
1716   if len > 0 then begin
1717     FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1718   end;
1719 end;
1720 
1721 procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
1722 // old sockets version
1723 var bCanWrite, bError : Boolean;
1724     retval, wsaError : Integer;
1725     pTmp : PByte;
1726 begin
1727   inherited;
1728 
1729   if not FTcpClient.Active
1730   then raise TTransportExceptionNotOpen.Create('not open');
1731 
returnsnull1732   // The select function returns the total number of socket handles that are ready
1733   // and contained in the fd_set structures, zero if the time limit expired,
1734   // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1735   // WSAGetLastError can be used to retrieve a specific error code.
1736   retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1737   if retval = SOCKET_ERROR
1738   then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
1739 
1740   if (retval = 0)
1741   then raise TTransportExceptionTimedOut.Create('timed out');
1742 
1743   if bError or not bCanWrite
1744   then raise TTransportExceptionUnknown.Create('unknown error');
1745 
1746   pTmp := pBuf;
1747   Inc( pTmp, offset);
1748   FTcpClient.SendBuf( pTmp^, count);
1749 end;
1750 
1751 {$ELSE}
1752 
Readnull1753 function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
1754 // new sockets version
1755 var nBytes : Integer;
1756     pTmp : PByte;
1757 begin
1758   inherited;
1759 
1760   result := 0;
1761   pTmp   := pBuf;
1762   Inc( pTmp, offset);
1763   while count > 0 do begin
1764     nBytes := FTcpClient.Read( pTmp^, count);
1765     if nBytes = 0 then Exit;
1766     Inc( pTmp, nBytes);
1767     Dec( count, nBytes);
1768     Inc( result, nBytes);
1769   end;
1770 end;
1771 
ToArraynull1772 function TTcpSocketStreamImpl.ToArray: TBytes;
1773 // new sockets version
1774 var len : Integer;
1775 begin
1776   len := 0;
1777   try
1778     if FTcpClient.Peek then
1779       repeat
1780         SetLength(Result, Length(Result) + 1024);
1781         len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1782       until len < 1024;
1783   except
1784     on TTransportException do begin { don't allow default exceptions } end;
1785     else raise;
1786   end;
1787   if len > 0 then
1788     SetLength(Result, Length(Result) - 1024 + len);
1789 end;
1790 
1791 procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
1792 // new sockets version
1793 var pTmp : PByte;
1794 begin
1795   inherited;
1796 
1797   if not FTcpClient.IsOpen
1798   then raise TTransportExceptionNotOpen.Create('not open');
1799 
1800   pTmp := pBuf;
1801   Inc( pTmp, offset);
1802   FTcpClient.Write( pTmp^, count);
1803 end;
1804 
1805 {$ENDIF}
1806 
1807 
1808 end.
1809