2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 unit Thrift.Transport;
21 {$I Thrift.Defines.inc}
31 {$IFDEF OLD_UNIT_NAMES}
48 ITransport = interface
49 ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
50 function GetIsOpen: Boolean;
51 property IsOpen: Boolean read GetIsOpen;
52 function Peek: Boolean;
55 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
56 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
57 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
58 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
59 procedure Write( const buf: TBytes); overload;
60 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
61 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
62 procedure Write( const pBuf : Pointer; len : Integer); overload;
66 TTransportImpl = class( TInterfacedObject, ITransport)
68 function GetIsOpen: Boolean; virtual; abstract;
69 property IsOpen: Boolean read GetIsOpen;
70 function Peek: Boolean; virtual;
71 procedure Open(); virtual; abstract;
72 procedure Close(); virtual; abstract;
73 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
74 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
75 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
76 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
77 procedure Write( const buf: TBytes); overload; inline;
78 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
79 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
80 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
81 procedure Flush; virtual;
84 TTransportException = class( TException)
97 function GetType: TExceptionType;
99 constructor HiddenCreate(const Msg: string);
101 class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
102 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
103 class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
104 property Type_: TExceptionType read GetType;
107 // Needed to remove deprecation warning
108 TTransportExceptionSpecialized = class abstract (TTransportException)
110 constructor Create(const Msg: string);
113 TTransportExceptionUnknown = class (TTransportExceptionSpecialized);
114 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized);
115 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized);
116 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized);
117 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized);
118 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized);
119 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized);
122 SSL_2, SSL_3, TLS_1, // outdated, for compatibilty only
123 TLS_1_1, TLS_1_2 // secure (as of today)
126 TSecureProtocols = set of TSecureProtocol;
128 IHTTPClient = interface( ITransport )
129 ['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}']
130 procedure SetDnsResolveTimeout(const Value: Integer);
131 function GetDnsResolveTimeout: Integer;
132 procedure SetConnectionTimeout(const Value: Integer);
133 function GetConnectionTimeout: Integer;
134 procedure SetSendTimeout(const Value: Integer);
135 function GetSendTimeout: Integer;
136 procedure SetReadTimeout(const Value: Integer);
137 function GetReadTimeout: Integer;
138 function GetCustomHeaders: IThriftDictionary<string,string>;
139 procedure SendRequest;
140 function GetSecureProtocols : TSecureProtocols;
141 procedure SetSecureProtocols( const value : TSecureProtocols);
143 property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout;
144 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
145 property SendTimeout: Integer read GetSendTimeout write SetSendTimeout;
146 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
147 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
148 property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols;
151 IServerTransport = interface
152 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
155 function Accept( const fnAccepting: TProc): ITransport;
158 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
160 procedure Listen; virtual; abstract;
161 procedure Close; virtual; abstract;
162 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
165 ITransportFactory = interface
166 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
167 function GetTransport( const ATrans: ITransport): ITransport;
170 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
171 function GetTransport( const ATrans: ITransport): ITransport; virtual;
174 TTcpSocketStreamImpl = class( TThriftStreamImpl )
177 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
179 FTcpClient : TCustomIpClient;
181 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
182 TimeOut: Integer; var wsaError : Integer): Integer;
183 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
184 var wsaError, bytesReady : Integer): TWaitForData;
191 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
192 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
193 procedure Open; override;
194 procedure Close; override;
195 procedure Flush; override;
197 function IsOpen: Boolean; override;
198 function ToArray: TBytes; override;
201 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
203 constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
207 IStreamTransport = interface( ITransport )
208 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
209 function GetInputStream: IThriftStream;
210 function GetOutputStream: IThriftStream;
211 property InputStream : IThriftStream read GetInputStream;
212 property OutputStream : IThriftStream read GetOutputStream;
215 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
217 FInputStream : IThriftStream;
218 FOutputStream : IThriftStream;
220 function GetIsOpen: Boolean; override;
222 function GetInputStream: IThriftStream;
223 function GetOutputStream: IThriftStream;
225 property InputStream : IThriftStream read GetInputStream;
226 property OutputStream : IThriftStream read GetOutputStream;
228 procedure Open; override;
229 procedure Close; override;
230 procedure Flush; override;
231 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
232 procedure Write( const pBuf : Pointer; off, len : Integer); override;
233 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
234 destructor Destroy; override;
237 TBufferedStreamImpl = class( TThriftStreamImpl)
239 FStream : IThriftStream;
241 FReadBuffer : TMemoryStream;
242 FWriteBuffer : TMemoryStream;
244 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
245 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
246 procedure Open; override;
247 procedure Close; override;
248 procedure Flush; override;
249 function IsOpen: Boolean; override;
250 function ToArray: TBytes; override;
252 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
253 destructor Destroy; override;
256 TServerSocketImpl = class( TServerTransportImpl)
259 FServer : TTcpServer;
261 FClientTimeout : Integer;
263 FServer: TServerSocket;
265 FUseBufferedSocket : Boolean;
266 FOwnsServer : Boolean;
268 function Accept( const fnAccepting: TProc) : ITransport; override;
271 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
272 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
274 constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
275 constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
277 destructor Destroy; override;
278 procedure Listen; override;
279 procedure Close; override;
282 TBufferedTransportImpl = class( TTransportImpl )
284 FInputBuffer : IThriftStream;
285 FOutputBuffer : IThriftStream;
286 FTransport : IStreamTransport;
289 procedure InitBuffers;
290 function GetUnderlyingTransport: ITransport;
292 function GetIsOpen: Boolean; override;
293 procedure Flush; override;
295 procedure Open(); override;
296 procedure Close(); override;
297 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
298 procedure Write( const pBuf : Pointer; off, len : Integer); override;
299 constructor Create( const ATransport : IStreamTransport ); overload;
300 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
301 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
302 property IsOpen: Boolean read GetIsOpen;
305 TSocketImpl = class(TStreamTransportImpl)
308 FClient : TCustomIpClient;
312 FOwnsClient : Boolean;
321 procedure InitSocket;
323 function GetIsOpen: Boolean; override;
325 procedure Open; override;
327 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
328 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
330 constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
331 constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
333 destructor Destroy; override;
334 procedure Close; override;
336 property TcpClient: TCustomIpClient read FClient;
338 property TcpClient: TSocket read FClient;
340 property Host : string read FHost;
341 property Port: Integer read FPort;
344 TFramedTransportImpl = class( TTransportImpl)
346 FHeaderSize : Integer = 4;
348 FHeader_Dummy : array of Byte;
350 FTransport : ITransport;
351 FWriteBuffer : TMemoryStream;
352 FReadBuffer : TMemoryStream;
354 procedure InitWriteBuffer;
358 TFactory = class( TTransportFactoryImpl )
360 function GetTransport( const ATrans: ITransport): ITransport; override;
363 {$IFDEF HAVE_CLASS_CTOR}
364 class constructor Create;
367 constructor Create; overload;
368 constructor Create( const ATrans: ITransport); overload;
369 destructor Destroy; override;
371 procedure Open(); override;
372 function GetIsOpen: Boolean; override;
374 procedure Close(); override;
375 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
376 procedure Write( const pBuf : Pointer; off, len : Integer); override;
377 procedure Flush; override;
380 {$IFNDEF HAVE_CLASS_CTOR}
381 procedure TFramedTransportImpl_Initialize;
385 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
386 DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
394 procedure TTransportImpl.Flush;
399 function TTransportImpl.Peek: Boolean;
404 function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
407 then result := Read( @buf[0], Length(buf), off, len)
411 function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
414 then result := ReadAll( @buf[0], Length(buf), off, len)
418 procedure TTransportImpl.Write( const buf: TBytes);
421 then Write( @buf[0], 0, Length(buf));
424 procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
427 then Write( @buf[0], off, len);
430 function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
434 while result < len do begin
435 ret := Read( pBuf, buflen, off + result, len - result);
437 then Inc( result, ret)
438 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
442 procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
444 Self.Write( pBuf, 0, len);
447 { TTransportException }
449 function TTransportException.GetType: TExceptionType;
451 if Self is TTransportExceptionNotOpen then Result := TExceptionType.NotOpen
452 else if Self is TTransportExceptionAlreadyOpen then Result := TExceptionType.AlreadyOpen
453 else if Self is TTransportExceptionTimedOut then Result := TExceptionType.TimedOut
454 else if Self is TTransportExceptionEndOfFile then Result := TExceptionType.EndOfFile
455 else if Self is TTransportExceptionBadArgs then Result := TExceptionType.BadArgs
456 else if Self is TTransportExceptionInterrupted then Result := TExceptionType.Interrupted
457 else Result := TExceptionType.Unknown;
460 constructor TTransportException.HiddenCreate(const Msg: string);
462 inherited Create(Msg);
465 class function TTransportException.Create(AType: TExceptionType): TTransportException;
468 {$WARN SYMBOL_DEPRECATED OFF}
469 Result := Create(AType, '')
470 {$WARN SYMBOL_DEPRECATED DEFAULT}
473 class function TTransportException.Create(AType: TExceptionType;
474 const msg: string): TTransportException;
477 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
478 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
479 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
480 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
481 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
482 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
484 Result := TTransportExceptionUnknown.Create(msg);
488 class function TTransportException.Create(const msg: string): TTransportException;
490 Result := TTransportExceptionUnknown.Create(Msg);
493 { TTransportExceptionSpecialized }
495 constructor TTransportExceptionSpecialized.Create(const Msg: string);
497 inherited HiddenCreate(Msg);
500 { TTransportFactoryImpl }
502 function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
510 constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
514 FClientTimeout := AClientTimeout;
517 constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
521 FServer.RecvTimeout := AClientTimeout;
522 FServer.SendTimeout := AClientTimeout;
527 constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
529 constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
535 FClientTimeout := AClientTimeout;
536 FServer := TTcpServer.Create( nil );
537 FServer.BlockMode := bmBlocking;
538 {$IF CompilerVersion >= 21.0}
539 FServer.LocalPort := AnsiString( IntToStr( FPort));
541 FServer.LocalPort := IntToStr( FPort);
544 FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
546 FUseBufferedSocket := AUseBufferedSockets;
550 destructor TServerSocketImpl.Destroy;
552 if FOwnsServer then begin
559 function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
562 client : TCustomIpClient;
566 trans : IStreamTransport;
568 if FServer = nil then begin
569 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
575 client := TCustomIpClient.Create(nil);
577 if Assigned(fnAccepting)
580 if not FServer.Accept( client) then begin
586 if client = nil then begin
591 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
592 client := nil; // trans owns it now
594 if FUseBufferedSocket
595 then result := TBufferedTransportImpl.Create( trans)
596 else result := trans;
599 on E: Exception do begin
601 raise TTransportExceptionUnknown.Create(E.ToString);
605 if Assigned(fnAccepting) then
608 client := FServer.Accept;
610 trans := TSocketImpl.Create(client, True);
613 if FUseBufferedSocket then
614 Result := TBufferedTransportImpl.Create(trans)
624 procedure TServerSocketImpl.Listen;
626 if FServer <> nil then
630 FServer.Active := True;
633 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
641 procedure TServerSocketImpl.Close;
643 if FServer <> nil then
646 FServer.Active := False;
649 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
659 constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
660 var stream : IThriftStream;
663 FTimeout := ATimeout;
664 FOwnsClient := aOwnsClient;
665 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
666 inherited Create( stream, stream);
669 constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
670 var stream : IThriftStream;
673 FTimeout := AClient.RecvTimeout;
674 FOwnsClient := aOwnsClient;
675 stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
676 inherited Create(stream, stream);
681 constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
683 constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
686 inherited Create(nil,nil);
689 FTimeout := ATimeout;
693 destructor TSocketImpl.Destroy;
696 then FreeAndNil( FClient);
700 procedure TSocketImpl.Close;
705 FOutputStream := nil;
708 then FreeAndNil( FClient)
712 function TSocketImpl.GetIsOpen: Boolean;
715 Result := (FClient <> nil) and FClient.Connected;
717 Result := (FClient <> nil) and FClient.IsOpen
721 procedure TSocketImpl.InitSocket;
723 stream : IThriftStream;
726 then FreeAndNil( FClient)
730 FClient := TTcpClient.Create( nil);
732 FClient := TSocket.Create(FHost, FPort);
736 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
737 FInputStream := stream;
738 FOutputStream := stream;
741 procedure TSocketImpl.Open;
744 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
747 if FHost = '' then begin
748 raise TTransportExceptionNotOpen.Create('Cannot open null host');
751 if Port <= 0 then begin
752 raise TTransportExceptionNotOpen.Create('Cannot open without port');
759 FClient.RemoteHost := TSocketHost( Host);
760 FClient.RemotePort := TSocketPort( IntToStr( Port));
766 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
767 FOutputStream := FInputStream;
772 procedure TBufferedStreamImpl.Close;
784 constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
788 FBufSize := ABufSize;
789 FReadBuffer := TMemoryStream.Create;
790 FWriteBuffer := TMemoryStream.Create;
793 destructor TBufferedStreamImpl.Destroy;
799 procedure TBufferedStreamImpl.Flush;
805 len := FWriteBuffer.Size;
806 if len > 0 then begin
807 SetLength( buf, len );
808 FWriteBuffer.Position := 0;
809 FWriteBuffer.Read( Pointer(@buf[0])^, len );
810 FStream.Write( buf, 0, len );
816 function TBufferedStreamImpl.IsOpen: Boolean;
818 Result := (FWriteBuffer <> nil)
819 and (FReadBuffer <> nil)
824 procedure TBufferedStreamImpl.Open;
829 function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
839 while count > 0 do begin
841 if FReadBuffer.Position >= FReadBuffer.Size then begin
843 SetLength( tempbuf, FBufSize);
844 nRead := FStream.Read( tempbuf, 0, FBufSize );
845 if nRead = 0 then Break; // avoid infinite loop
847 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
848 FReadBuffer.Position := 0;
851 if FReadBuffer.Position < FReadBuffer.Size then begin
852 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
855 Inc( Result, FReadBuffer.Read( pTmp^, nRead));
863 function TBufferedStreamImpl.ToArray: TBytes;
869 len := FReadBuffer.Size;
872 SetLength( Result, len);
874 if len > 0 then begin
875 FReadBuffer.Position := 0;
876 FReadBuffer.Read( Pointer(@Result[0])^, len );
880 procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
884 if count > 0 then begin
888 FWriteBuffer.Write( pTmp^, count );
889 if FWriteBuffer.Size > FBufSize then begin
896 { TStreamTransportImpl }
898 constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
901 FInputStream := AInputStream;
902 FOutputStream := AOutputStream;
905 destructor TStreamTransportImpl.Destroy;
908 FOutputStream := nil;
912 procedure TStreamTransportImpl.Close;
915 FOutputStream := nil;
918 procedure TStreamTransportImpl.Flush;
920 if FOutputStream = nil then begin
921 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
927 function TStreamTransportImpl.GetInputStream: IThriftStream;
929 Result := FInputStream;
932 function TStreamTransportImpl.GetIsOpen: Boolean;
937 function TStreamTransportImpl.GetOutputStream: IThriftStream;
939 Result := FOutputStream;
942 procedure TStreamTransportImpl.Open;
947 function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
949 if FInputStream = nil then begin
950 raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
953 Result := FInputStream.Read( pBuf,buflen, off, len );
956 procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
958 if FOutputStream = nil then begin
959 raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
962 FOutputStream.Write( pBuf, off, len );
965 { TBufferedTransportImpl }
967 constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
970 Create( ATransport, 1024 );
973 constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
976 FTransport := ATransport;
977 FBufSize := ABufSize;
981 procedure TBufferedTransportImpl.Close;
985 FOutputBuffer := nil;
988 procedure TBufferedTransportImpl.Flush;
990 if FOutputBuffer <> nil then begin
995 function TBufferedTransportImpl.GetIsOpen: Boolean;
997 Result := FTransport.IsOpen;
1000 function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1002 Result := FTransport;
1005 procedure TBufferedTransportImpl.InitBuffers;
1007 if FTransport.InputStream <> nil then begin
1008 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1010 if FTransport.OutputStream <> nil then begin
1011 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1015 procedure TBufferedTransportImpl.Open;
1018 InitBuffers; // we need to get the buffers to match FTransport substreams again
1021 function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
1024 if FInputBuffer <> nil then begin
1025 Result := FInputBuffer.Read( pBuf,buflen, off, len );
1029 procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
1031 if FOutputBuffer <> nil then begin
1032 FOutputBuffer.Write( pBuf, off, len );
1036 { TFramedTransportImpl }
1038 {$IFDEF HAVE_CLASS_CTOR}
1039 class constructor TFramedTransportImpl.Create;
1041 SetLength( FHeader_Dummy, FHeaderSize);
1042 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1045 procedure TFramedTransportImpl_Initialize;
1047 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1048 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1049 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1053 constructor TFramedTransportImpl.Create;
1059 procedure TFramedTransportImpl.Close;
1064 constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1068 FTransport := ATrans;
1071 destructor TFramedTransportImpl.Destroy;
1078 procedure TFramedTransportImpl.Flush;
1085 len := FWriteBuffer.Size;
1086 SetLength( buf, len);
1087 if len > 0 then begin
1088 System.Move( FWriteBuffer.Memory^, buf[0], len );
1091 data_len := len - FHeaderSize;
1092 if (data_len < 0) then begin
1093 raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
1098 buf[0] := Byte($FF and (data_len shr 24));
1099 buf[1] := Byte($FF and (data_len shr 16));
1100 buf[2] := Byte($FF and (data_len shr 8));
1101 buf[3] := Byte($FF and data_len);
1103 FTransport.Write( buf, 0, len );
1107 function TFramedTransportImpl.GetIsOpen: Boolean;
1109 Result := FTransport.IsOpen;
1113 TAccessMemoryStream = class(TMemoryStream)
1116 procedure TFramedTransportImpl.InitWriteBuffer;
1119 FWriteBuffer := TMemoryStream.Create;
1120 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1121 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1124 procedure TFramedTransportImpl.Open;
1129 function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
1132 if len > (buflen-off)
1133 then len := buflen-off;
1138 if (FReadBuffer <> nil) and (len > 0) then begin
1139 result := FReadBuffer.Read( pTmp^, len);
1140 if result > 0 then begin
1147 then Result := FReadBuffer.Read( pTmp^, len)
1151 procedure TFramedTransportImpl.ReadFrame;
1157 SetLength( i32rd, FHeaderSize );
1158 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1160 ((i32rd[0] and $FF) shl 24) or
1161 ((i32rd[1] and $FF) shl 16) or
1162 ((i32rd[2] and $FF) shl 8) or
1164 SetLength( buff, size );
1165 FTransport.ReadAll( buff, 0, size );
1167 FReadBuffer := TMemoryStream.Create;
1169 then FReadBuffer.Write( Pointer(@buff[0])^, size );
1170 FReadBuffer.Position := 0;
1173 procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
1176 if len > 0 then begin
1180 FWriteBuffer.Write( pTmp^, len );
1184 { TFramedTransport.TFactory }
1186 function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1188 Result := TFramedTransportImpl.Create( ATrans );
1191 { TTcpSocketStreamImpl }
1193 procedure TTcpSocketStreamImpl.Close;
1198 {$IFDEF OLD_SOCKETS}
1199 constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
1202 FTcpClient := ATcpClient;
1203 FTimeout := aTimeout;
1206 constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
1209 FTcpClient := ATcpClient;
1210 if aTimeout = 0 then
1211 FTcpClient.RecvTimeout := SLEEP_TIME
1213 FTcpClient.RecvTimeout := aTimeout;
1214 FTcpClient.SendTimeout := aTimeout;
1218 procedure TTcpSocketStreamImpl.Flush;
1223 function TTcpSocketStreamImpl.IsOpen: Boolean;
1225 {$IFDEF OLD_SOCKETS}
1226 Result := FTcpClient.Active;
1228 Result := FTcpClient.IsOpen;
1232 procedure TTcpSocketStreamImpl.Open;
1238 {$IFDEF OLD_SOCKETS}
1239 function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1240 TimeOut: Integer; var wsaError : Integer): Integer;
1245 WriteFdsptr: PFDset;
1247 ExceptFdsptr: PFDset;
1252 if not FTcpClient.Active then begin
1253 wsaError := WSAEINVAL;
1254 Exit( SOCKET_ERROR);
1257 socket := FTcpClient.Handle;
1259 if Assigned(ReadReady) then begin
1260 ReadFdsptr := @ReadFds;
1262 FD_SET(socket, ReadFds);
1268 if Assigned(WriteReady) then begin
1269 WriteFdsptr := @WriteFds;
1271 FD_SET(socket, WriteFds);
1277 if Assigned(ExceptFlag) then begin
1278 ExceptFdsptr := @ExceptFds;
1280 FD_SET(socket, ExceptFds);
1283 ExceptFdsptr := nil;
1286 if TimeOut >= 0 then begin
1287 tv.tv_sec := TimeOut div 1000;
1288 tv.tv_usec := 1000 * (TimeOut mod 1000);
1292 Timeptr := nil; // wait forever
1298 {$IFDEF OLD_UNIT_NAMES}
1299 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1301 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1305 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1308 if result = SOCKET_ERROR
1309 then wsaError := WSAGetLastError;
1312 result := SOCKET_ERROR;
1315 if Assigned(ReadReady) then
1316 ReadReady^ := FD_ISSET(socket, ReadFds);
1318 if Assigned(WriteReady) then
1319 WriteReady^ := FD_ISSET(socket, WriteFds);
1321 if Assigned(ExceptFlag) then
1322 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1326 {$IFDEF OLD_SOCKETS}
1327 function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1328 DesiredBytes : Integer;
1329 var wsaError, bytesReady : Integer): TWaitForData;
1330 var bCanRead, bError : Boolean;
1333 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
1337 // The select function returns the total number of socket handles that are ready
1338 // and contained in the fd_set structures, zero if the time limit expired,
1339 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1340 // WSAGetLastError can be used to retrieve a specific error code.
1341 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1342 if retval = SOCKET_ERROR
1343 then Exit( TWaitForData.wfd_Error);
1344 if (retval = 0) or not bCanRead
1345 then Exit( TWaitForData.wfd_Timeout);
1347 // recv() returns the number of bytes received, or -1 if an error occurred.
1348 // The return value will be 0 when the peer has performed an orderly shutdown.
1350 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
1352 then Exit( TWaitForData.wfd_Error);
1354 // at least we have some data
1355 bytesReady := Min( retval, DesiredBytes);
1356 result := TWaitForData.wfd_HaveData;
1360 {$IFDEF OLD_SOCKETS}
1361 function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
1362 // old sockets version
1363 var wfd : TWaitForData;
1372 then msecs := FTimeout
1373 else msecs := DEFAULT_THRIFT_TIMEOUT;
1378 while count > 0 do begin
1381 wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
1383 TWaitForData.wfd_Error : Exit;
1384 TWaitForData.wfd_HaveData : Break;
1385 TWaitForData.wfd_Timeout : begin
1389 raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
1398 // reduce the timeout once we got data
1400 then msecs := FTimeout div 10
1401 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1402 msecs := Max( msecs, 200);
1404 ASSERT( nBytes <= count);
1405 nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1407 Dec( count, nBytes);
1408 Inc( result, nBytes);
1412 function TTcpSocketStreamImpl.ToArray: TBytes;
1413 // old sockets version
1417 if IsOpen then begin
1418 len := FTcpClient.BytesReceived;
1421 SetLength( Result, len );
1423 if len > 0 then begin
1424 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1428 procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
1429 // old sockets version
1430 var bCanWrite, bError : Boolean;
1431 retval, wsaError : Integer;
1436 if not FTcpClient.Active
1437 then raise TTransportExceptionNotOpen.Create('not open');
1439 // The select function returns the total number of socket handles that are ready
1440 // and contained in the fd_set structures, zero if the time limit expired,
1441 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1442 // WSAGetLastError can be used to retrieve a specific error code.
1443 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1444 if retval = SOCKET_ERROR
1445 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
1448 then raise TTransportExceptionTimedOut.Create('timed out');
1450 if bError or not bCanWrite
1451 then raise TTransportExceptionUnknown.Create('unknown error');
1455 FTcpClient.SendBuf( pTmp^, count);
1460 function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
1461 // new sockets version
1462 var nBytes : Integer;
1470 while count > 0 do begin
1471 nBytes := FTcpClient.Read( pTmp^, count);
1472 if nBytes = 0 then Exit;
1474 Dec( count, nBytes);
1475 Inc( result, nBytes);
1479 function TTcpSocketStreamImpl.ToArray: TBytes;
1480 // new sockets version
1485 if FTcpClient.Peek then
1487 SetLength(Result, Length(Result) + 1024);
1488 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1491 on TTransportException do begin { don't allow default exceptions } end;
1495 SetLength(Result, Length(Result) - 1024 + len);
1498 procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
1499 // new sockets version
1504 if not FTcpClient.IsOpen
1505 then raise TTransportExceptionNotOpen.Create('not open');
1509 FTcpClient.Write( pTmp^, count);
1515 {$IF CompilerVersion < 21.0}
1518 TFramedTransportImpl_Initialize;