1 (*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19 unit Thrift.Transport;
20
21 {$I Thrift.Defines.inc}
22 {$SCOPEDENUMS ON}
23
24 interface
25
26 uses
27 Classes,
28 SysUtils,
29 Math,
30 Generics.Collections,
31 {$IFDEF OLD_UNIT_NAMES}
32 WinSock, Sockets,
33 {$ELSE}
34 Winapi.WinSock,
35 {$IFDEF OLD_SOCKETS}
36 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
39 {$ENDIF}
40 {$ENDIF}
41 Thrift.Configuration,
42 Thrift.Collections,
43 Thrift.Exception,
44 Thrift.Utils,
45 Thrift.WinHTTP,
46 Thrift.Stream;
47
48 const
49 DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024; // 100 MB
50 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
51
52 type
53 IStreamTransport = interface;
54
55 ITransport = interface
56 ['{52F81383-F880-492F-8AA7-A66B85B93D6B}']
GetIsOpennull57 function GetIsOpen: Boolean;
58 property IsOpen: Boolean read GetIsOpen;
Peeknull59 function Peek: Boolean;
60 procedure Open;
61 procedure Close;
62
Readnull63 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
Readnull64 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
ReadAllnull65 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
ReadAllnull66 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
67 procedure Write( const buf: TBytes); overload;
68 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
69 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
70 procedure Write( const pBuf : Pointer; len : Integer); overload;
71 procedure Flush;
72
Configurationnull73 function Configuration : IThriftConfiguration;
MaxMessageSizenull74 function MaxMessageSize : Integer;
75 procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
76 procedure CheckReadBytesAvailable( const numBytes : Int64);
77 procedure UpdateKnownMessageSize( const size : Int64);
78 end;
79
80 TTransportBase = class abstract( TInterfacedObject)
81 strict protected
GetIsOpennull82 function GetIsOpen: Boolean; virtual; abstract;
83 property IsOpen: Boolean read GetIsOpen;
Peeknull84 function Peek: Boolean; virtual;
85 procedure Open(); virtual; abstract;
86 procedure Close(); virtual; abstract;
87
Readnull88 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
Readnull89 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
ReadAllnull90 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
ReadAllnull91 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
92 procedure Write( const buf: TBytes); overload; inline;
93 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
94 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
95 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
96 procedure Flush; virtual;
97
Configurationnull98 function Configuration : IThriftConfiguration; virtual; abstract;
99 procedure UpdateKnownMessageSize( const size : Int64); virtual; abstract;
100 end;
101
102 // base class for all endpoint transports, e.g. sockets, pipes or HTTP
103 TEndpointTransportBase = class abstract( TTransportBase, ITransport)
104 strict private
105 FRemainingMessageSize : Int64;
106 FKnownMessageSize : Int64;
107 FConfiguration : IThriftConfiguration;
108 strict protected
Configurationnull109 function Configuration : IThriftConfiguration; override;
MaxMessageSizenull110 function MaxMessageSize : Integer;
111 property RemainingMessageSize : Int64 read FRemainingMessageSize;
112 property KnownMessageSize : Int64 read FKnownMessageSize;
113 procedure ResetConsumedMessageSize( const newSize : Int64 = -1);
114 procedure UpdateKnownMessageSize(const size : Int64); override;
115 procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
116 procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
117 public
118 constructor Create( const aConfig : IThriftConfiguration); reintroduce;
119 end;
120
121 // base class for all layered transports, e.g. framed
122 TLayeredTransportBase<T : ITransport> = class abstract( TTransportBase, ITransport)
123 strict private
124 FTransport : T;
125 strict protected
126 property InnerTransport : T read FTransport;
GetUnderlyingTransportnull127 function GetUnderlyingTransport: ITransport;
Configurationnull128 function Configuration : IThriftConfiguration; override;
129 procedure UpdateKnownMessageSize( const size : Int64); override;
MaxMessageSizenull130 function MaxMessageSize : Integer; inline;
131 procedure ResetConsumedMessageSize( const knownSize : Int64 = -1); inline;
132 procedure CheckReadBytesAvailable( const numBytes : Int64); virtual;
133 public
134 constructor Create( const aTransport: T); reintroduce;
135 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
136 end;
137
138 TTransportException = class abstract( TException)
139 public
140 type
141 TExceptionType = (
142 Unknown,
143 NotOpen,
144 AlreadyOpen,
145 TimedOut,
146 EndOfFile,
147 BadArgs,
148 Interrupted,
149 CorruptedData
150 );
151 strict protected
152 constructor HiddenCreate(const Msg: string);
GetTypenull153 class function GetType: TExceptionType; virtual; abstract;
154 public
Createnull155 class function Create( aType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Createnull156 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Createnull157 class function Create( aType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
158 property Type_: TExceptionType read GetType;
159 end;
160
161 // Needed to remove deprecation warning
162 TTransportExceptionSpecialized = class abstract (TTransportException)
163 public
164 constructor Create(const Msg: string);
165 end;
166
167 TTransportExceptionUnknown = class (TTransportExceptionSpecialized)
168 strict protected
GetTypenull169 class function GetType: TTransportException.TExceptionType; override;
170 end;
171
172 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized)
173 strict protected
GetTypenull174 class function GetType: TTransportException.TExceptionType; override;
175 end;
176
177 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized)
178 strict protected
GetTypenull179 class function GetType: TTransportException.TExceptionType; override;
180 end;
181
182 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized)
183 strict protected
GetTypenull184 class function GetType: TTransportException.TExceptionType; override;
185 end;
186
187 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized)
188 strict protected
GetTypenull189 class function GetType: TTransportException.TExceptionType; override;
190 end;
191
192 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized)
193 strict protected
GetTypenull194 class function GetType: TTransportException.TExceptionType; override;
195 end;
196
197 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized)
198 strict protected
GetTypenull199 class function GetType: TTransportException.TExceptionType; override;
200 end;
201
202 TTransportExceptionCorruptedData = class (TTransportExceptionSpecialized)
203 protected
GetTypenull204 class function GetType: TTransportException.TExceptionType; override;
205 end;
206
207 TSecureProtocol = (
208 SSL_2, SSL_3, TLS_1, // outdated, for compatibilty only
209 TLS_1_1, TLS_1_2 // secure (as of today)
210 );
211
212 TSecureProtocols = set of TSecureProtocol;
213
214 IHTTPClient = interface( ITransport )
215 ['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}']
216 procedure SetDnsResolveTimeout(const Value: Integer);
GetDnsResolveTimeoutnull217 function GetDnsResolveTimeout: Integer;
218 procedure SetConnectionTimeout(const Value: Integer);
GetConnectionTimeoutnull219 function GetConnectionTimeout: Integer;
220 procedure SetSendTimeout(const Value: Integer);
GetSendTimeoutnull221 function GetSendTimeout: Integer;
222 procedure SetReadTimeout(const Value: Integer);
GetReadTimeoutnull223 function GetReadTimeout: Integer;
GetCustomHeadersnull224 function GetCustomHeaders: IThriftDictionary<string,string>;
225 procedure SendRequest;
GetSecureProtocolsnull226 function GetSecureProtocols : TSecureProtocols;
227 procedure SetSecureProtocols( const value : TSecureProtocols);
228
229 property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout;
230 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
231 property SendTimeout: Integer read GetSendTimeout write SetSendTimeout;
232 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
233 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
234 property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols;
235 end;
236
237 IServerTransport = interface
238 ['{FA01363F-6B40-482F-971E-4A085535EFC8}']
239 procedure Listen;
240 procedure Close;
Acceptnull241 function Accept( const fnAccepting: TProc): ITransport;
Configurationnull242 function Configuration : IThriftConfiguration;
243 end;
244
245 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
246 strict private
247 FConfig : IThriftConfiguration;
248 strict protected
Configurationnull249 function Configuration : IThriftConfiguration;
250 procedure Listen; virtual; abstract;
251 procedure Close; virtual; abstract;
Acceptnull252 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
253 public
254 constructor Create( const aConfig : IThriftConfiguration);
255 end;
256
257 ITransportFactory = interface
258 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
GetTransportnull259 function GetTransport( const aTransport: ITransport): ITransport;
260 end;
261
262 TTransportFactoryImpl = class ( TInterfacedObject, ITransportFactory)
263 strict protected
GetTransportnull264 function GetTransport( const aTransport: ITransport): ITransport; virtual;
265 end;
266
267
268 TTcpSocketStreamImpl = class( TThriftStreamImpl)
269 {$IFDEF OLD_SOCKETS}
270 strict private type
271 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
272 strict private
273 FTcpClient : TCustomIpClient;
274 FTimeout : Integer;
Selectnull275 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
276 TimeOut: Integer; var wsaError : Integer): Integer;
WaitForDatanull277 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
278 var wsaError, bytesReady : Integer): TWaitForData;
279 {$ELSE}
280 FTcpClient: TSocket;
281 strict protected const
282 SLEEP_TIME = 200;
283 {$ENDIF}
284 strict protected
285 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
Readnull286 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
287 procedure Open; override;
288 procedure Close; override;
289 procedure Flush; override;
290
IsOpennull291 function IsOpen: Boolean; override;
ToArraynull292 function ToArray: TBytes; override;
293 public
294 {$IFDEF OLD_SOCKETS}
295 constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = DEFAULT_THRIFT_TIMEOUT);
296 {$ELSE}
297 constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = DEFAULT_THRIFT_TIMEOUT);
298 {$ENDIF}
299 end;
300
301 IStreamTransport = interface( ITransport )
302 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
GetInputStreamnull303 function GetInputStream: IThriftStream;
GetOutputStreamnull304 function GetOutputStream: IThriftStream;
305 property InputStream : IThriftStream read GetInputStream;
306 property OutputStream : IThriftStream read GetOutputStream;
307 end;
308
309 TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
310 strict protected
311 FInputStream : IThriftStream;
312 FOutputStream : IThriftStream;
313 strict protected
GetIsOpennull314 function GetIsOpen: Boolean; override;
315
GetInputStreamnull316 function GetInputStream: IThriftStream;
GetOutputStreamnull317 function GetOutputStream: IThriftStream;
318
319 strict protected
320 procedure Open; override;
321 procedure Close; override;
322 procedure Flush; override;
Readnull323 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
324 procedure Write( const pBuf : Pointer; off, len : Integer); override;
325 public
326 constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil); reintroduce;
327 destructor Destroy; override;
328
329 property InputStream : IThriftStream read GetInputStream;
330 property OutputStream : IThriftStream read GetOutputStream;
331 end;
332
333 TBufferedStreamImpl = class( TThriftStreamImpl)
334 strict private
335 FStream : IThriftStream;
336 FBufSize : Integer;
337 FReadBuffer : TMemoryStream;
338 FWriteBuffer : TMemoryStream;
339 strict protected
340 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
Readnull341 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
342 procedure Open; override;
343 procedure Close; override;
344 procedure Flush; override;
IsOpennull345 function IsOpen: Boolean; override;
ToArraynull346 function ToArray: TBytes; override;
Sizenull347 function Size : Int64; override;
Positionnull348 function Position : Int64; override;
349 public
350 constructor Create( const aStream: IThriftStream; const aBufSize : Integer);
351 destructor Destroy; override;
352 end;
353
354 TServerSocketImpl = class( TServerTransportImpl)
355 strict private
356 {$IFDEF OLD_SOCKETS}
357 FServer : TTcpServer;
358 FPort : Integer;
359 FClientTimeout : Integer;
360 {$ELSE}
361 FServer: TServerSocket;
362 {$ENDIF}
363 FUseBufferedSocket : Boolean;
364 FOwnsServer : Boolean;
365
366 strict protected
Acceptnull367 function Accept( const fnAccepting: TProc) : ITransport; override;
368
369 public
370 {$IFDEF OLD_SOCKETS}
371 constructor Create( const aServer: TTcpServer; const aClientTimeout : Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
372 constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
373 {$ELSE}
374 constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
375 constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
376 {$ENDIF}
377
378 destructor Destroy; override;
379 procedure Listen; override;
380 procedure Close; override;
381 end;
382
383 TBufferedTransportImpl = class( TLayeredTransportBase<IStreamTransport>)
384 strict private
385 FInputBuffer : IThriftStream;
386 FOutputBuffer : IThriftStream;
387 FBufSize : Integer;
388
389 procedure InitBuffers;
390 strict protected
GetIsOpennull391 function GetIsOpen: Boolean; override;
392 procedure Flush; override;
393 public
394 type
395 TFactory = class( TTransportFactoryImpl )
396 public
GetTransportnull397 function GetTransport( const aTransport: ITransport): ITransport; override;
398 end;
399
400 constructor Create( const aTransport : IStreamTransport; const aBufSize: Integer = 1024);
401 procedure Open(); override;
402 procedure Close(); override;
Readnull403 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
404 procedure Write( const pBuf : Pointer; off, len : Integer); override;
405 procedure CheckReadBytesAvailable( const value : Int64); override;
406 property IsOpen: Boolean read GetIsOpen;
407 end;
408
409 TSocketImpl = class(TStreamTransportImpl)
410 strict private
411 {$IFDEF OLD_SOCKETS}
412 FClient : TCustomIpClient;
413 {$ELSE}
414 FClient: TSocket;
415 {$ENDIF}
416 FOwnsClient : Boolean;
417 FHost : string;
418 FPort : Integer;
419 {$IFDEF OLD_SOCKETS}
420 FTimeout : Integer;
421 {$ELSE}
422 FTimeout : Longword;
423 {$ENDIF}
424
425 procedure InitSocket;
426 strict protected
GetIsOpennull427 function GetIsOpen: Boolean; override;
428 public
429 {$IFDEF OLD_SOCKETS}
430 constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
431 constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
432 {$ELSE}
433 constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration = nil); overload;
434 constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
435 {$ENDIF}
436 destructor Destroy; override;
437
438 procedure Open; override;
439 procedure Close; override;
440 {$IFDEF OLD_SOCKETS}
441 property TcpClient: TCustomIpClient read FClient;
442 {$ELSE}
443 property TcpClient: TSocket read FClient;
444 {$ENDIF}
445 property Host : string read FHost;
446 property Port: Integer read FPort;
447 end;
448
449 TFramedTransportImpl = class( TLayeredTransportBase<ITransport>)
450 strict protected type
451 TFramedHeader = Int32;
452 strict protected
453 FWriteBuffer : TMemoryStream;
454 FReadBuffer : TMemoryStream;
455
456 procedure InitWriteBuffer;
457 procedure ReadFrame;
458
459 procedure Open(); override;
GetIsOpennull460 function GetIsOpen: Boolean; override;
461
462 procedure Close(); override;
Readnull463 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
464 procedure Write( const pBuf : Pointer; off, len : Integer); override;
465 procedure CheckReadBytesAvailable( const value : Int64); override;
466 procedure Flush; override;
467
468 public
469 type
470 TFactory = class( TTransportFactoryImpl )
471 public
GetTransportnull472 function GetTransport( const aTransport: ITransport): ITransport; override;
473 end;
474
475 constructor Create( const aTransport: ITransport); overload;
476 destructor Destroy; override;
477 end;
478
479
480 const
481 DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
482
483 implementation
484
485
486 { TTransportBase }
487
488 procedure TTransportBase.Flush;
489 begin
490 // nothing to do
491 end;
492
Peeknull493 function TTransportBase.Peek: Boolean;
494 begin
495 Result := IsOpen;
496 end;
497
Readnull498 function TTransportBase.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
499 begin
500 if Length(buf) > 0
501 then result := Read( @buf[0], Length(buf), off, len)
502 else result := 0;
503 end;
504
ReadAllnull505 function TTransportBase.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
506 begin
507 if Length(buf) > 0
508 then result := ReadAll( @buf[0], Length(buf), off, len)
509 else result := 0;
510 end;
511
ReadAllnull512 function TTransportBase.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
513 var ret : Integer;
514 begin
515 result := 0;
516 while result < len do begin
517 ret := Read( pBuf, buflen, off + result, len - result);
518 if ret > 0
519 then Inc( result, ret)
520 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
521 end;
522 end;
523
524 procedure TTransportBase.Write( const buf: TBytes);
525 begin
526 if Length(buf) > 0
527 then Write( @buf[0], 0, Length(buf));
528 end;
529
530 procedure TTransportBase.Write( const buf: TBytes; off: Integer; len: Integer);
531 begin
532 if Length(buf) > 0
533 then Write( @buf[0], off, len);
534 end;
535
536 procedure TTransportBase.Write( const pBuf : Pointer; len : Integer);
537 begin
538 Self.Write( pBuf, 0, len);
539 end;
540
541
542 { TEndpointTransportBase }
543
544 constructor TEndpointTransportBase.Create( const aConfig : IThriftConfiguration);
545 begin
546 inherited Create;
547
548 if aConfig <> nil
549 then FConfiguration := aConfig
550 else FConfiguration := TThriftConfigurationImpl.Create;
551
552 ResetConsumedMessageSize;
553 end;
554
555
Configurationnull556 function TEndpointTransportBase.Configuration : IThriftConfiguration;
557 begin
558 result := FConfiguration;
559 end;
560
561
MaxMessageSizenull562 function TEndpointTransportBase.MaxMessageSize : Integer;
563 begin
564 ASSERT( Configuration <> nil);
565 result := Configuration.MaxMessageSize;
566 end;
567
568
569 procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
570 // Resets RemainingMessageSize to the configured maximum
571 begin
572 // full reset
573 if newSize < 0 then begin
574 FKnownMessageSize := MaxMessageSize;
575 FRemainingMessageSize := MaxMessageSize;
576 Exit;
577 end;
578
579 // update only: message size can shrink, but not grow
580 ASSERT( KnownMessageSize <= MaxMessageSize);
581 if newSize > KnownMessageSize
582 then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
583
584 FKnownMessageSize := newSize;
585 FRemainingMessageSize := newSize;
586 end;
587
588
589 procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
590 // Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
591 // Will throw if we already consumed too many bytes.
592 var consumed : Int64;
593 begin
594 consumed := KnownMessageSize - RemainingMessageSize;
595 ResetConsumedMessageSize(size);
596 CountConsumedMessageBytes(consumed);
597 end;
598
599
600 procedure TEndpointTransportBase.CheckReadBytesAvailable( const numBytes : Int64);
601 // Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
602 begin
603 if RemainingMessageSize < numBytes
604 then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
605 end;
606
607
608 procedure TEndpointTransportBase.CountConsumedMessageBytes( const numBytes : Int64);
609 // Consumes numBytes from the RemainingMessageSize.
610 begin
611 if (RemainingMessageSize >= numBytes)
612 then Dec( FRemainingMessageSize, numBytes)
613 else begin
614 FRemainingMessageSize := 0;
615 raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
616 end;
617 end;
618
619 { TLayeredTransportBase }
620
621 constructor TLayeredTransportBase<T>.Create( const aTransport: T);
622 begin
623 inherited Create;
624 FTransport := aTransport;
625 end;
626
TLayeredTransportBasenull627 function TLayeredTransportBase<T>.GetUnderlyingTransport: ITransport;
628 begin
629 result := InnerTransport;
630 end;
631
TLayeredTransportBasenull632 function TLayeredTransportBase<T>.Configuration : IThriftConfiguration;
633 begin
634 result := InnerTransport.Configuration;
635 end;
636
637 procedure TLayeredTransportBase<T>.UpdateKnownMessageSize( const size : Int64);
638 begin
639 InnerTransport.UpdateKnownMessageSize( size);
640 end;
641
642
TLayeredTransportBasenull643 function TLayeredTransportBase<T>.MaxMessageSize : Integer;
644 begin
645 result := InnerTransport.MaxMessageSize;
646 end;
647
648
649 procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
650 begin
651 InnerTransport.ResetConsumedMessageSize( knownSize);
652 end;
653
654
655 procedure TLayeredTransportBase<T>.CheckReadBytesAvailable( const numBytes : Int64);
656 begin
657 InnerTransport.CheckReadBytesAvailable( numBytes);
658 end;
659
660
661
662 { TTransportException }
663
664 constructor TTransportException.HiddenCreate(const Msg: string);
665 begin
666 inherited Create(Msg);
667 end;
668
TTransportException.Createnull669 class function TTransportException.Create(aType: TExceptionType): TTransportException;
670 begin
671 //no inherited;
672 {$WARN SYMBOL_DEPRECATED OFF}
673 Result := Create(aType, '')
674 {$WARN SYMBOL_DEPRECATED DEFAULT}
675 end;
676
TTransportException.Createnull677 class function TTransportException.Create(aType: TExceptionType; const msg: string): TTransportException;
678 begin
679 case aType of
680 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
681 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
682 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
683 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
684 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
685 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
686 else
687 ASSERT( TExceptionType.Unknown = aType);
688 Result := TTransportExceptionUnknown.Create(msg);
689 end;
690 end;
691
TTransportException.Createnull692 class function TTransportException.Create(const msg: string): TTransportException;
693 begin
694 Result := TTransportExceptionUnknown.Create(Msg);
695 end;
696
697 { TTransportExceptionSpecialized }
698
699 constructor TTransportExceptionSpecialized.Create(const Msg: string);
700 begin
701 inherited HiddenCreate(Msg);
702 end;
703
704 { specialized TTransportExceptions }
705
TTransportExceptionUnknown.GetTypenull706 class function TTransportExceptionUnknown.GetType: TTransportException.TExceptionType;
707 begin
708 result := TExceptionType.Unknown;
709 end;
710
TTransportExceptionNotOpen.GetTypenull711 class function TTransportExceptionNotOpen.GetType: TTransportException.TExceptionType;
712 begin
713 result := TExceptionType.NotOpen;
714 end;
715
TTransportExceptionAlreadyOpen.GetTypenull716 class function TTransportExceptionAlreadyOpen.GetType: TTransportException.TExceptionType;
717 begin
718 result := TExceptionType.AlreadyOpen;
719 end;
720
TTransportExceptionTimedOut.GetTypenull721 class function TTransportExceptionTimedOut.GetType: TTransportException.TExceptionType;
722 begin
723 result := TExceptionType.TimedOut;
724 end;
725
TTransportExceptionEndOfFile.GetTypenull726 class function TTransportExceptionEndOfFile.GetType: TTransportException.TExceptionType;
727 begin
728 result := TExceptionType.EndOfFile;
729 end;
730
TTransportExceptionBadArgs.GetTypenull731 class function TTransportExceptionBadArgs.GetType: TTransportException.TExceptionType;
732 begin
733 result := TExceptionType.BadArgs;
734 end;
735
TTransportExceptionInterrupted.GetTypenull736 class function TTransportExceptionInterrupted.GetType: TTransportException.TExceptionType;
737 begin
738 result := TExceptionType.Interrupted;
739 end;
740
TTransportExceptionCorruptedData.GetTypenull741 class function TTransportExceptionCorruptedData.GetType: TTransportException.TExceptionType;
742 begin
743 result := TExceptionType.CorruptedData;
744 end;
745
746 { TTransportFactoryImpl }
747
TTransportFactoryImpl.GetTransportnull748 function TTransportFactoryImpl.GetTransport( const aTransport: ITransport): ITransport;
749 begin
750 Result := aTransport;
751 end;
752
753
754 { TServerTransportImpl }
755
756 constructor TServerTransportImpl.Create( const aConfig : IThriftConfiguration);
757 begin
758 inherited Create;
759 if aConfig <> nil
760 then FConfig := aConfig
761 else FConfig := TThriftConfigurationImpl.Create;
762 end;
763
TServerTransportImpl.Configurationnull764 function TServerTransportImpl.Configuration : IThriftConfiguration;
765 begin
766 result := FConfig;
767 end;
768
769 { TServerSocket }
770
771 {$IFDEF OLD_SOCKETS}
772 constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aConfig : IThriftConfiguration);
773 {$ELSE}
774 constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aConfig : IThriftConfiguration);
775 {$ENDIF}
776 begin
777 inherited Create( aConfig);
778 FServer := aServer;
779
780
781 {$IFDEF OLD_SOCKETS}
782 FClientTimeout := aClientTimeout;
783 {$ELSE}
784 FServer.RecvTimeout := aClientTimeout;
785 FServer.SendTimeout := aClientTimeout;
786 {$ENDIF}
787 end;
788
789
790 {$IFDEF OLD_SOCKETS}
791 constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
792 {$ELSE}
793 constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
794 {$ENDIF}
795 begin
796 inherited Create( aConfig);
797
798 {$IFDEF OLD_SOCKETS}
799 FPort := aPort;
800 FClientTimeout := aClientTimeout;
801
802 FOwnsServer := True;
803 FServer := TTcpServer.Create( nil );
804 FServer.BlockMode := bmBlocking;
805 {$IF CompilerVersion >= 21.0}
806 FServer.LocalPort := AnsiString( IntToStr( FPort));
807 {$ELSE}
808 FServer.LocalPort := IntToStr( FPort);
809 {$IFEND}
810 {$ELSE}
811 FOwnsServer := True;
812 FServer := TServerSocket.Create(aPort, aClientTimeout, aClientTimeout);
813 {$ENDIF}
814
815 FUseBufferedSocket := aUseBufferedSockets;
816 end;
817
818 destructor TServerSocketImpl.Destroy;
819 begin
820 if FOwnsServer then begin
821 FServer.Free;
822 FServer := nil;
823 end;
824 inherited;
825 end;
826
Acceptnull827 function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
828 var
829 {$IFDEF OLD_SOCKETS}
830 client : TCustomIpClient;
831 {$ELSE}
832 client: TSocket;
833 {$ENDIF}
834 trans : IStreamTransport;
835 begin
836 if FServer = nil then begin
837 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
838 end;
839
840 {$IFDEF OLD_SOCKETS}
841 client := nil;
842 try
843 client := TCustomIpClient.Create(nil);
844
845 if Assigned(fnAccepting)
846 then fnAccepting();
847
848 if not FServer.Accept( client) then begin
849 client.Free;
850 Result := nil;
851 Exit;
852 end;
853
854 if client = nil then begin
855 Result := nil;
856 Exit;
857 end;
858
859 trans := TSocketImpl.Create( client, TRUE, FClientTimeout, Configuration);
860 client := nil; // trans owns it now
861
862 if FUseBufferedSocket
863 then result := TBufferedTransportImpl.Create( trans)
864 else result := trans;
865
866 except
867 on E: Exception do begin
868 client.Free;
869 raise TTransportExceptionUnknown.Create(E.ToString);
870 end;
871 end;
872 {$ELSE}
873 if Assigned(fnAccepting) then
874 fnAccepting();
875
876 client := FServer.Accept;
877 try
878 trans := TSocketImpl.Create(client, TRUE, Configuration);
879 client := nil;
880
881 if FUseBufferedSocket then
882 Result := TBufferedTransportImpl.Create(trans)
883 else
884 Result := trans;
885 except
886 client.Free;
887 raise;
888 end;
889 {$ENDIF}
890 end;
891
892 procedure TServerSocketImpl.Listen;
893 begin
894 if FServer <> nil then
895 begin
896 {$IFDEF OLD_SOCKETS}
897 try
898 FServer.Active := True;
899 except
900 on E: Exception
901 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
902 end;
903 {$ELSE}
904 FServer.Listen;
905 {$ENDIF}
906 end;
907 end;
908
909 procedure TServerSocketImpl.Close;
910 begin
911 if FServer <> nil then
912 {$IFDEF OLD_SOCKETS}
913 try
914 FServer.Active := False;
915 except
916 on E: Exception
917 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
918 end;
919 {$ELSE}
920 FServer.Close;
921 {$ENDIF}
922 end;
923
924 { TSocket }
925
926 {$IFDEF OLD_SOCKETS}
927 constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aConfig : IThriftConfiguration);
928 {$ELSE}
929 constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration);
930 {$ENDIF}
931 var stream : IThriftStream;
932 begin
933 FClient := aClient;
934 FOwnsClient := aOwnsClient;
935
936 {$IFDEF OLD_SOCKETS}
937 FTimeout := aTimeout;
938 {$ELSE}
939 FTimeout := aClient.RecvTimeout;
940 {$ENDIF}
941
942 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
943 inherited Create( stream, stream, aConfig);
944 end;
945
946
947 {$IFDEF OLD_SOCKETS}
948 constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aConfig : IThriftConfiguration);
949 {$ELSE}
950 constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aConfig : IThriftConfiguration);
951 {$ENDIF}
952 begin
953 inherited Create(nil,nil, aConfig);
954 FHost := aHost;
955 FPort := aPort;
956 FTimeout := aTimeout;
957 InitSocket;
958 end;
959
960 destructor TSocketImpl.Destroy;
961 begin
962 if FOwnsClient
963 then FreeAndNil( FClient);
964 inherited;
965 end;
966
967 procedure TSocketImpl.Close;
968 begin
969 inherited Close;
970
971 FInputStream := nil;
972 FOutputStream := nil;
973
974 if FOwnsClient
975 then FreeAndNil( FClient)
976 else FClient := nil;
977 end;
978
TSocketImpl.GetIsOpennull979 function TSocketImpl.GetIsOpen: Boolean;
980 begin
981 {$IFDEF OLD_SOCKETS}
982 Result := (FClient <> nil) and FClient.Connected;
983 {$ELSE}
984 Result := (FClient <> nil) and FClient.IsOpen
985 {$ENDIF}
986 end;
987
988 procedure TSocketImpl.InitSocket;
989 var
990 stream : IThriftStream;
991 begin
992 if FOwnsClient
993 then FreeAndNil( FClient)
994 else FClient := nil;
995
996 {$IFDEF OLD_SOCKETS}
997 FClient := TTcpClient.Create( nil);
998 {$ELSE}
999 FClient := TSocket.Create(FHost, FPort);
1000 {$ENDIF}
1001 FOwnsClient := True;
1002
1003 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
1004 FInputStream := stream;
1005 FOutputStream := stream;
1006 end;
1007
1008 procedure TSocketImpl.Open;
1009 begin
1010 if IsOpen then begin
1011 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
1012 end;
1013
1014 if FHost = '' then begin
1015 raise TTransportExceptionNotOpen.Create('Cannot open null host');
1016 end;
1017
1018 if Port <= 0 then begin
1019 raise TTransportExceptionNotOpen.Create('Cannot open without port');
1020 end;
1021
1022 if FClient = nil
1023 then InitSocket;
1024
1025 {$IFDEF OLD_SOCKETS}
1026 FClient.RemoteHost := TSocketHost( Host);
1027 FClient.RemotePort := TSocketPort( IntToStr( Port));
1028 FClient.Connect;
1029 {$ELSE}
1030 FClient.Open;
1031 {$ENDIF}
1032
1033 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
1034 FOutputStream := FInputStream;
1035 end;
1036
1037 { TBufferedStream }
1038
1039 procedure TBufferedStreamImpl.Close;
1040 begin
1041 Flush;
1042 FStream := nil;
1043
1044 FReadBuffer.Free;
1045 FReadBuffer := nil;
1046
1047 FWriteBuffer.Free;
1048 FWriteBuffer := nil;
1049 end;
1050
1051 constructor TBufferedStreamImpl.Create( const aStream: IThriftStream; const aBufSize : Integer);
1052 begin
1053 inherited Create;
1054 FStream := aStream;
1055 FBufSize := aBufSize;
1056 FReadBuffer := TMemoryStream.Create;
1057 FWriteBuffer := TMemoryStream.Create;
1058 end;
1059
1060 destructor TBufferedStreamImpl.Destroy;
1061 begin
1062 Close;
1063 inherited;
1064 end;
1065
1066 procedure TBufferedStreamImpl.Flush;
1067 var
1068 buf : TBytes;
1069 len : Integer;
1070 begin
1071 if IsOpen then begin
1072 len := FWriteBuffer.Size;
1073 if len > 0 then begin
1074 SetLength( buf, len );
1075 FWriteBuffer.Position := 0;
1076 FWriteBuffer.Read( Pointer(@buf[0])^, len );
1077 FStream.Write( buf, 0, len );
1078 end;
1079 FWriteBuffer.Clear;
1080 end;
1081 end;
1082
TBufferedStreamImpl.IsOpennull1083 function TBufferedStreamImpl.IsOpen: Boolean;
1084 begin
1085 Result := (FWriteBuffer <> nil)
1086 and (FReadBuffer <> nil)
1087 and (FStream <> nil)
1088 and FStream.IsOpen;
1089 end;
1090
1091 procedure TBufferedStreamImpl.Open;
1092 begin
1093 FStream.Open;
1094 end;
1095
Readnull1096 function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
1097 var
1098 nRead : Integer;
1099 tempbuf : TBytes;
1100 pTmp : PByte;
1101 begin
1102 inherited;
1103 Result := 0;
1104
1105 if IsOpen then begin
1106 while count > 0 do begin
1107
1108 if FReadBuffer.Position >= FReadBuffer.Size then begin
1109 FReadBuffer.Clear;
1110 SetLength( tempbuf, FBufSize);
1111 nRead := FStream.Read( tempbuf, 0, FBufSize );
1112 if nRead = 0 then Break; // avoid infinite loop
1113
1114 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
1115 FReadBuffer.Position := 0;
1116 end;
1117
1118 if FReadBuffer.Position < FReadBuffer.Size then begin
1119 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
1120 pTmp := pBuf;
1121 Inc( pTmp, offset);
1122 Inc( Result, FReadBuffer.Read( pTmp^, nRead));
1123 Dec( count, nRead);
1124 Inc( offset, nRead);
1125 end;
1126 end;
1127 end;
1128 end;
1129
1130
ToArraynull1131 function TBufferedStreamImpl.ToArray: TBytes;
1132 var len : Integer;
1133 begin
1134 if IsOpen
1135 then len := FReadBuffer.Size
1136 else len := 0;
1137
1138 SetLength( Result, len);
1139
1140 if len > 0 then begin
1141 FReadBuffer.Position := 0;
1142 FReadBuffer.Read( Pointer(@Result[0])^, len );
1143 end;
1144 end;
1145
1146 procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
1147 var pTmp : PByte;
1148 begin
1149 inherited;
1150 if count > 0 then begin
1151 if IsOpen then begin
1152 pTmp := pBuf;
1153 Inc( pTmp, offset);
1154 FWriteBuffer.Write( pTmp^, count );
1155 if FWriteBuffer.Size > FBufSize then begin
1156 Flush;
1157 end;
1158 end;
1159 end;
1160 end;
1161
1162
TBufferedStreamImpl.Sizenull1163 function TBufferedStreamImpl.Size : Int64;
1164 begin
1165 result := FReadBuffer.Size;
1166 end;
1167
1168
Positionnull1169 function TBufferedStreamImpl.Position : Int64;
1170 begin
1171 result := FReadBuffer.Position;
1172 end;
1173
1174
1175 { TStreamTransportImpl }
1176
1177 constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
1178 begin
1179 inherited Create( aConfig);
1180 FInputStream := aInputStream;
1181 FOutputStream := aOutputStream;
1182 end;
1183
1184 destructor TStreamTransportImpl.Destroy;
1185 begin
1186 FInputStream := nil;
1187 FOutputStream := nil;
1188 inherited;
1189 end;
1190
1191 procedure TStreamTransportImpl.Close;
1192 begin
1193 FInputStream := nil;
1194 FOutputStream := nil;
1195 end;
1196
1197 procedure TStreamTransportImpl.Flush;
1198 begin
1199 if FOutputStream = nil then begin
1200 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
1201 end;
1202
1203 FOutputStream.Flush;
1204 end;
1205
GetInputStreamnull1206 function TStreamTransportImpl.GetInputStream: IThriftStream;
1207 begin
1208 Result := FInputStream;
1209 end;
1210
GetIsOpennull1211 function TStreamTransportImpl.GetIsOpen: Boolean;
1212 begin
1213 Result := True;
1214 end;
1215
GetOutputStreamnull1216 function TStreamTransportImpl.GetOutputStream: IThriftStream;
1217 begin
1218 Result := FOutputStream;
1219 end;
1220
1221 procedure TStreamTransportImpl.Open;
1222 begin
1223 // nothing to do
1224 end;
1225
Readnull1226 function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
1227 begin
1228 if FInputStream = nil
1229 then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
1230
1231 Result := FInputStream.Read( pBuf,buflen, off, len );
1232 CountConsumedMessageBytes( result);
1233 end;
1234
1235 procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
1236 begin
1237 if FOutputStream = nil
1238 then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
1239
1240 FOutputStream.Write( pBuf, off, len );
1241 end;
1242
1243 { TBufferedTransportImpl }
1244
1245 constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
1246 begin
1247 ASSERT( aTransport <> nil);
1248 inherited Create( aTransport);
1249 FBufSize := aBufSize;
1250 InitBuffers;
1251 end;
1252
1253 procedure TBufferedTransportImpl.Close;
1254 begin
1255 InnerTransport.Close;
1256 FInputBuffer := nil;
1257 FOutputBuffer := nil;
1258 end;
1259
1260 procedure TBufferedTransportImpl.Flush;
1261 begin
1262 if FOutputBuffer <> nil then begin
1263 FOutputBuffer.Flush;
1264 end;
1265 end;
1266
TBufferedTransportImpl.GetIsOpennull1267 function TBufferedTransportImpl.GetIsOpen: Boolean;
1268 begin
1269 Result := InnerTransport.IsOpen;
1270 end;
1271
1272 procedure TBufferedTransportImpl.InitBuffers;
1273 begin
1274 if InnerTransport.InputStream <> nil then begin
1275 FInputBuffer := TBufferedStreamImpl.Create( InnerTransport.InputStream, FBufSize );
1276 end;
1277 if InnerTransport.OutputStream <> nil then begin
1278 FOutputBuffer := TBufferedStreamImpl.Create( InnerTransport.OutputStream, FBufSize );
1279 end;
1280 end;
1281
1282 procedure TBufferedTransportImpl.Open;
1283 begin
1284 InnerTransport.Open;
1285 InitBuffers; // we need to get the buffers to match FTransport substreams again
1286 end;
1287
Readnull1288 function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
1289 begin
1290 if FInputBuffer <> nil
1291 then Result := FInputBuffer.Read( pBuf,buflen, off, len )
1292 else Result := 0;
1293 end;
1294
1295 procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
1296 begin
1297 if FOutputBuffer <> nil then begin
1298 FOutputBuffer.Write( pBuf, off, len );
1299 end;
1300 end;
1301
1302 procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Int64);
1303 var buffered, need : Int64;
1304 begin
1305 need := value;
1306
1307 // buffered bytes
1308 buffered := FInputBuffer.Size - FInputBuffer.Position;
1309 if buffered < need
1310 then InnerTransport.CheckReadBytesAvailable( need - buffered);
1311 end;
1312
1313
1314 { TBufferedTransportImpl.TFactory }
1315
TFactorynull1316 function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
1317 begin
1318 Result := TFramedTransportImpl.Create( aTransport);
1319 end;
1320
1321
1322 { TFramedTransportImpl }
1323
1324 constructor TFramedTransportImpl.Create( const aTransport: ITransport);
1325 begin
1326 ASSERT( aTransport <> nil);
1327 inherited Create( aTransport);
1328
1329 InitWriteBuffer;
1330 end;
1331
1332 destructor TFramedTransportImpl.Destroy;
1333 begin
1334 FWriteBuffer.Free;
1335 FWriteBuffer := nil;
1336 FReadBuffer.Free;
1337 FReadBuffer := nil;
1338 inherited;
1339 end;
1340
1341 procedure TFramedTransportImpl.Close;
1342 begin
1343 InnerTransport.Close;
1344 end;
1345
1346 procedure TFramedTransportImpl.Flush;
1347 var
1348 buf : TBytes;
1349 len : Integer;
1350 data_len : Int64;
1351 begin
1352 if not IsOpen
1353 then raise TTransportExceptionNotOpen.Create('not open');
1354
1355 len := FWriteBuffer.Size;
1356 SetLength( buf, len);
1357 if len > 0 then begin
1358 System.Move( FWriteBuffer.Memory^, buf[0], len );
1359 end;
1360
1361 data_len := len - SizeOf(TFramedHeader);
1362 if (0 > data_len) or (data_len > Configuration.MaxFrameSize)
1363 then raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: invalid frame size ('+IntToStr(data_len)+')')
1364 else UpdateKnownMessageSize( len);
1365
1366 InitWriteBuffer;
1367
1368 buf[0] := Byte($FF and (data_len shr 24));
1369 buf[1] := Byte($FF and (data_len shr 16));
1370 buf[2] := Byte($FF and (data_len shr 8));
1371 buf[3] := Byte($FF and data_len);
1372
1373 InnerTransport.Write( buf, 0, len );
1374 InnerTransport.Flush;
1375 end;
1376
GetIsOpennull1377 function TFramedTransportImpl.GetIsOpen: Boolean;
1378 begin
1379 Result := InnerTransport.IsOpen;
1380 end;
1381
1382 type
1383 TAccessMemoryStream = class(TMemoryStream)
1384 end;
1385
1386 procedure TFramedTransportImpl.InitWriteBuffer;
1387 const DUMMY_HEADER : TFramedHeader = 0;
1388 begin
1389 FreeAndNil( FWriteBuffer);
1390 FWriteBuffer := TMemoryStream.Create;
1391 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1392 FWriteBuffer.Write( DUMMY_HEADER, SizeOf(DUMMY_HEADER));
1393 end;
1394
1395 procedure TFramedTransportImpl.Open;
1396 begin
1397 InnerTransport.Open;
1398 end;
1399
Readnull1400 function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
1401 var pTmp : PByte;
1402 begin
1403 if len > (buflen-off)
1404 then len := buflen-off;
1405
1406 pTmp := pBuf;
1407 Inc( pTmp, off);
1408
1409 if (FReadBuffer <> nil) and (len > 0) then begin
1410 result := FReadBuffer.Read( pTmp^, len);
1411 if result > 0 then Exit;
1412 end;
1413
1414 ReadFrame;
1415 if len > 0
1416 then Result := FReadBuffer.Read( pTmp^, len)
1417 else Result := 0;
1418 end;
1419
1420 procedure TFramedTransportImpl.ReadFrame;
1421 var
1422 i32rd : packed array[0..SizeOf(TFramedHeader)-1] of Byte;
1423 size : Integer;
1424 buff : TBytes;
1425 begin
1426 InnerTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
1427 size :=
1428 ((i32rd[0] and $FF) shl 24) or
1429 ((i32rd[1] and $FF) shl 16) or
1430 ((i32rd[2] and $FF) shl 8) or
1431 (i32rd[3] and $FF);
1432
1433 if size < 0 then begin
1434 Close();
1435 raise TTransportExceptionCorruptedData.Create('Read a negative frame size ('+IntToStr(size)+')');
1436 end;
1437
1438 if Int64(size) > Int64(Configuration.MaxFrameSize) then begin
1439 Close();
1440 raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(Configuration.MaxFrameSize)+')');
1441 end;
1442
1443 UpdateKnownMessageSize(size + SizeOf(size));
1444
1445 SetLength( buff, size );
1446 InnerTransport.ReadAll( buff, 0, size );
1447
1448 FreeAndNil( FReadBuffer);
1449 FReadBuffer := TMemoryStream.Create;
1450 if Length(buff) > 0
1451 then FReadBuffer.Write( Pointer(@buff[0])^, size );
1452 FReadBuffer.Position := 0;
1453 end;
1454
1455 procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
1456 var pTmp : PByte;
1457 begin
1458 if len > 0 then begin
1459 pTmp := pBuf;
1460 Inc( pTmp, off);
1461
1462 FWriteBuffer.Write( pTmp^, len );
1463 end;
1464 end;
1465
1466
1467 procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Int64);
1468 var buffered, need : Int64;
1469 begin
1470 need := value;
1471
1472 // buffered bytes
1473 buffered := FReadBuffer.Size - FReadBuffer.Position;
1474 if buffered < need
1475 then InnerTransport.CheckReadBytesAvailable( need - buffered);
1476 end;
1477
1478
1479 { TFramedTransport.TFactory }
1480
TFactorynull1481 function TFramedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
1482 begin
1483 Result := TFramedTransportImpl.Create( aTransport);
1484 end;
1485
1486 { TTcpSocketStreamImpl }
1487
1488 procedure TTcpSocketStreamImpl.Close;
1489 begin
1490 FTcpClient.Close;
1491 end;
1492
1493 {$IFDEF OLD_SOCKETS}
1494 constructor TTcpSocketStreamImpl.Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer);
1495 begin
1496 inherited Create;
1497 FTcpClient := aTcpClient;
1498 FTimeout := aTimeout;
1499 end;
1500 {$ELSE}
1501 constructor TTcpSocketStreamImpl.Create( const aTcpClient: TSocket; const aTimeout : Longword);
1502 begin
1503 inherited Create;
1504 FTcpClient := aTcpClient;
1505 if aTimeout = 0 then
1506 FTcpClient.RecvTimeout := SLEEP_TIME
1507 else
1508 FTcpClient.RecvTimeout := aTimeout;
1509 FTcpClient.SendTimeout := aTimeout;
1510 end;
1511 {$ENDIF}
1512
1513 procedure TTcpSocketStreamImpl.Flush;
1514 begin
1515 // nothing to do
1516 end;
1517
1518
TTcpSocketStreamImpl.IsOpennull1519 function TTcpSocketStreamImpl.IsOpen: Boolean;
1520 begin
1521 {$IFDEF OLD_SOCKETS}
1522 Result := FTcpClient.Active;
1523 {$ELSE}
1524 Result := FTcpClient.IsOpen;
1525 {$ENDIF}
1526 end;
1527
1528 procedure TTcpSocketStreamImpl.Open;
1529 begin
1530 FTcpClient.Open;
1531 end;
1532
1533
1534 {$IFDEF OLD_SOCKETS}
TTcpSocketStreamImpl.Selectnull1535 function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1536 TimeOut: Integer; var wsaError : Integer): Integer;
1537 var
1538 ReadFds: TFDset;
1539 ReadFdsptr: PFDset;
1540 WriteFds: TFDset;
1541 WriteFdsptr: PFDset;
1542 ExceptFds: TFDset;
1543 ExceptFdsptr: PFDset;
1544 tv: timeval;
1545 Timeptr: PTimeval;
1546 socket : TSocket;
1547 begin
1548 if not FTcpClient.Active then begin
1549 wsaError := WSAEINVAL;
1550 Exit( SOCKET_ERROR);
1551 end;
1552
1553 socket := FTcpClient.Handle;
1554
1555 if Assigned(ReadReady) then begin
1556 ReadFdsptr := @ReadFds;
1557 FD_ZERO(ReadFds);
1558 FD_SET(socket, ReadFds);
1559 end
1560 else begin
1561 ReadFdsptr := nil;
1562 end;
1563
1564 if Assigned(WriteReady) then begin
1565 WriteFdsptr := @WriteFds;
1566 FD_ZERO(WriteFds);
1567 FD_SET(socket, WriteFds);
1568 end
1569 else begin
1570 WriteFdsptr := nil;
1571 end;
1572
1573 if Assigned(ExceptFlag) then begin
1574 ExceptFdsptr := @ExceptFds;
1575 FD_ZERO(ExceptFds);
1576 FD_SET(socket, ExceptFds);
1577 end
1578 else begin
1579 ExceptFdsptr := nil;
1580 end;
1581
1582 if TimeOut >= 0 then begin
1583 tv.tv_sec := TimeOut div 1000;
1584 tv.tv_usec := 1000 * (TimeOut mod 1000);
1585 Timeptr := @tv;
1586 end
1587 else begin
1588 Timeptr := nil; // wait forever
1589 end;
1590
1591 wsaError := 0;
1592 try
1593 {$IFDEF MSWINDOWS}
1594 {$IFDEF OLD_UNIT_NAMES}
1595 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1596 {$ELSE}
1597 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1598 {$ENDIF}
1599 {$ENDIF}
1600 {$IFDEF LINUX}
1601 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1602 {$ENDIF}
1603
1604 if result = SOCKET_ERROR
1605 then wsaError := WSAGetLastError;
1606
1607 except
1608 result := SOCKET_ERROR;
1609 end;
1610
1611 if Assigned(ReadReady) then
1612 ReadReady^ := FD_ISSET(socket, ReadFds);
1613
1614 if Assigned(WriteReady) then
1615 WriteReady^ := FD_ISSET(socket, WriteFds);
1616
1617 if Assigned(ExceptFlag) then
1618 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1619 end;
1620 {$ENDIF}
1621
1622 {$IFDEF OLD_SOCKETS}
WaitForDatanull1623 function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1624 DesiredBytes : Integer;
1625 var wsaError, bytesReady : Integer): TWaitForData;
1626 var bCanRead, bError : Boolean;
1627 retval : Integer;
1628 const
1629 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
1630 begin
1631 bytesReady := 0;
1632
returnsnull1633 // The select function returns the total number of socket handles that are ready
1634 // and contained in the fd_set structures, zero if the time limit expired,
1635 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1636 // WSAGetLastError can be used to retrieve a specific error code.
1637 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1638 if retval = SOCKET_ERROR
1639 then Exit( TWaitForData.wfd_Error);
1640 if (retval = 0) or not bCanRead
1641 then Exit( TWaitForData.wfd_Timeout);
1642
1643 // recv() returns the number of bytes received, or -1 if an error occurred.
1644 // The return value will be 0 when the peer has performed an orderly shutdown.
1645
1646 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
1647 if retval <= 0
1648 then Exit( TWaitForData.wfd_Error);
1649
1650 // at least we have some data
1651 bytesReady := Min( retval, DesiredBytes);
1652 result := TWaitForData.wfd_HaveData;
1653 end;
1654 {$ENDIF}
1655
1656 {$IFDEF OLD_SOCKETS}
Readnull1657 function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
1658 // old sockets version
1659 var wfd : TWaitForData;
1660 wsaError,
1661 msecs : Integer;
1662 nBytes : Integer;
1663 pTmp : PByte;
1664 begin
1665 inherited;
1666
1667 if FTimeout > 0
1668 then msecs := FTimeout
1669 else msecs := DEFAULT_THRIFT_TIMEOUT;
1670
1671 result := 0;
1672 pTmp := pBuf;
1673 Inc( pTmp, offset);
1674 while (count > 0) and (result = 0) do begin
1675
1676 while TRUE do begin
1677 wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
1678 case wfd of
1679 TWaitForData.wfd_Error : Exit;
1680 TWaitForData.wfd_HaveData : Break;
1681 TWaitForData.wfd_Timeout : begin
1682 if (FTimeout = 0)
1683 then Exit
1684 else raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
1685 end;
1686 else
1687 ASSERT( FALSE);
1688 end;
1689 end;
1690
1691 // reduce the timeout once we got data
1692 if FTimeout > 0
1693 then msecs := FTimeout div 10
1694 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1695 msecs := Max( msecs, 200);
1696
1697 ASSERT( nBytes <= count);
1698 nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1699 Inc( pTmp, nBytes);
1700 Dec( count, nBytes);
1701 Inc( result, nBytes);
1702 end;
1703 end;
1704
ToArraynull1705 function TTcpSocketStreamImpl.ToArray: TBytes;
1706 // old sockets version
1707 var len : Integer;
1708 begin
1709 len := 0;
1710 if IsOpen then begin
1711 len := FTcpClient.BytesReceived;
1712 end;
1713
1714 SetLength( Result, len );
1715
1716 if len > 0 then begin
1717 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1718 end;
1719 end;
1720
1721 procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
1722 // old sockets version
1723 var bCanWrite, bError : Boolean;
1724 retval, wsaError : Integer;
1725 pTmp : PByte;
1726 begin
1727 inherited;
1728
1729 if not FTcpClient.Active
1730 then raise TTransportExceptionNotOpen.Create('not open');
1731
returnsnull1732 // The select function returns the total number of socket handles that are ready
1733 // and contained in the fd_set structures, zero if the time limit expired,
1734 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1735 // WSAGetLastError can be used to retrieve a specific error code.
1736 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1737 if retval = SOCKET_ERROR
1738 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
1739
1740 if (retval = 0)
1741 then raise TTransportExceptionTimedOut.Create('timed out');
1742
1743 if bError or not bCanWrite
1744 then raise TTransportExceptionUnknown.Create('unknown error');
1745
1746 pTmp := pBuf;
1747 Inc( pTmp, offset);
1748 FTcpClient.SendBuf( pTmp^, count);
1749 end;
1750
1751 {$ELSE}
1752
Readnull1753 function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
1754 // new sockets version
1755 var nBytes : Integer;
1756 pTmp : PByte;
1757 begin
1758 inherited;
1759
1760 result := 0;
1761 pTmp := pBuf;
1762 Inc( pTmp, offset);
1763 while count > 0 do begin
1764 nBytes := FTcpClient.Read( pTmp^, count);
1765 if nBytes = 0 then Exit;
1766 Inc( pTmp, nBytes);
1767 Dec( count, nBytes);
1768 Inc( result, nBytes);
1769 end;
1770 end;
1771
ToArraynull1772 function TTcpSocketStreamImpl.ToArray: TBytes;
1773 // new sockets version
1774 var len : Integer;
1775 begin
1776 len := 0;
1777 try
1778 if FTcpClient.Peek then
1779 repeat
1780 SetLength(Result, Length(Result) + 1024);
1781 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1782 until len < 1024;
1783 except
1784 on TTransportException do begin { don't allow default exceptions } end;
1785 else raise;
1786 end;
1787 if len > 0 then
1788 SetLength(Result, Length(Result) - 1024 + len);
1789 end;
1790
1791 procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
1792 // new sockets version
1793 var pTmp : PByte;
1794 begin
1795 inherited;
1796
1797 if not FTcpClient.IsOpen
1798 then raise TTransportExceptionNotOpen.Create('not open');
1799
1800 pTmp := pBuf;
1801 Inc( pTmp, offset);
1802 FTcpClient.Write( pTmp^, count);
1803 end;
1804
1805 {$ENDIF}
1806
1807
1808 end.
1809