]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/delphi/src/Thrift.Transport.pas
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / delphi / src / Thrift.Transport.pas
CommitLineData
f67539c2
TL
1(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19unit Thrift.Transport;
20
21{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
24interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
30 Generics.Collections,
31 {$IFDEF OLD_UNIT_NAMES}
32 WinSock, Sockets,
33 {$ELSE}
34 Winapi.WinSock,
35 {$IFDEF OLD_SOCKETS}
36 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
39 {$ENDIF}
40 {$ENDIF}
41 Thrift.Collections,
42 Thrift.Exception,
43 Thrift.Utils,
44 Thrift.WinHTTP,
45 Thrift.Stream;
46
47type
48 ITransport = interface
49 ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
50 function GetIsOpen: Boolean;
51 property IsOpen: Boolean read GetIsOpen;
52 function Peek: Boolean;
53 procedure Open;
54 procedure Close;
55 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
56 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
57 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
58 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
59 procedure Write( const buf: TBytes); overload;
60 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
61 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
62 procedure Write( const pBuf : Pointer; len : Integer); overload;
63 procedure Flush;
64 end;
65
66 TTransportImpl = class( TInterfacedObject, ITransport)
67 protected
68 function GetIsOpen: Boolean; virtual; abstract;
69 property IsOpen: Boolean read GetIsOpen;
70 function Peek: Boolean; virtual;
71 procedure Open(); virtual; abstract;
72 procedure Close(); virtual; abstract;
73 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
74 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
75 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
76 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
77 procedure Write( const buf: TBytes); overload; inline;
78 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
79 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
80 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
81 procedure Flush; virtual;
82 end;
83
84 TTransportException = class( TException)
85 public
86 type
87 TExceptionType = (
88 Unknown,
89 NotOpen,
90 AlreadyOpen,
91 TimedOut,
92 EndOfFile,
93 BadArgs,
94 Interrupted
95 );
96 private
97 function GetType: TExceptionType;
98 protected
99 constructor HiddenCreate(const Msg: string);
100 public
101 class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
102 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
103 class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
104 property Type_: TExceptionType read GetType;
105 end;
106
107 // Needed to remove deprecation warning
108 TTransportExceptionSpecialized = class abstract (TTransportException)
109 public
110 constructor Create(const Msg: string);
111 end;
112
113 TTransportExceptionUnknown = class (TTransportExceptionSpecialized);
114 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized);
115 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized);
116 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized);
117 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized);
118 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized);
119 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized);
120
121 TSecureProtocol = (
122 SSL_2, SSL_3, TLS_1, // outdated, for compatibilty only
123 TLS_1_1, TLS_1_2 // secure (as of today)
124 );
125
126 TSecureProtocols = set of TSecureProtocol;
127
128 IHTTPClient = interface( ITransport )
129 ['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}']
130 procedure SetDnsResolveTimeout(const Value: Integer);
131 function GetDnsResolveTimeout: Integer;
132 procedure SetConnectionTimeout(const Value: Integer);
133 function GetConnectionTimeout: Integer;
134 procedure SetSendTimeout(const Value: Integer);
135 function GetSendTimeout: Integer;
136 procedure SetReadTimeout(const Value: Integer);
137 function GetReadTimeout: Integer;
138 function GetCustomHeaders: IThriftDictionary<string,string>;
139 procedure SendRequest;
140 function GetSecureProtocols : TSecureProtocols;
141 procedure SetSecureProtocols( const value : TSecureProtocols);
142
143 property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout;
144 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
145 property SendTimeout: Integer read GetSendTimeout write SetSendTimeout;
146 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
147 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
148 property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols;
149 end;
150
151 IServerTransport = interface
152 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
153 procedure Listen;
154 procedure Close;
155 function Accept( const fnAccepting: TProc): ITransport;
156 end;
157
158 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
159 protected
160 procedure Listen; virtual; abstract;
161 procedure Close; virtual; abstract;
162 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
163 end;
164
165 ITransportFactory = interface
166 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
167 function GetTransport( const ATrans: ITransport): ITransport;
168 end;
169
170 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
171 function GetTransport( const ATrans: ITransport): ITransport; virtual;
172 end;
173
174 TTcpSocketStreamImpl = class( TThriftStreamImpl )
175{$IFDEF OLD_SOCKETS}
176 private type
177 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
178 private
179 FTcpClient : TCustomIpClient;
180 FTimeout : Integer;
181 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
182 TimeOut: Integer; var wsaError : Integer): Integer;
183 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
184 var wsaError, bytesReady : Integer): TWaitForData;
185{$ELSE}
186 FTcpClient: TSocket;
187 protected const
188 SLEEP_TIME = 200;
189{$ENDIF}
190 protected
191 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
192 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
193 procedure Open; override;
194 procedure Close; override;
195 procedure Flush; override;
196
197 function IsOpen: Boolean; override;
198 function ToArray: TBytes; override;
199 public
200{$IFDEF OLD_SOCKETS}
201 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
202{$ELSE}
203 constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
204{$ENDIF}
205 end;
206
207 IStreamTransport = interface( ITransport )
208 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
209 function GetInputStream: IThriftStream;
210 function GetOutputStream: IThriftStream;
211 property InputStream : IThriftStream read GetInputStream;
212 property OutputStream : IThriftStream read GetOutputStream;
213 end;
214
215 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
216 protected
217 FInputStream : IThriftStream;
218 FOutputStream : IThriftStream;
219 protected
220 function GetIsOpen: Boolean; override;
221
222 function GetInputStream: IThriftStream;
223 function GetOutputStream: IThriftStream;
224 public
225 property InputStream : IThriftStream read GetInputStream;
226 property OutputStream : IThriftStream read GetOutputStream;
227
228 procedure Open; override;
229 procedure Close; override;
230 procedure Flush; override;
231 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
232 procedure Write( const pBuf : Pointer; off, len : Integer); override;
233 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
234 destructor Destroy; override;
235 end;
236
237 TBufferedStreamImpl = class( TThriftStreamImpl)
238 private
239 FStream : IThriftStream;
240 FBufSize : Integer;
241 FReadBuffer : TMemoryStream;
242 FWriteBuffer : TMemoryStream;
243 protected
244 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
245 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
246 procedure Open; override;
247 procedure Close; override;
248 procedure Flush; override;
249 function IsOpen: Boolean; override;
250 function ToArray: TBytes; override;
251 public
252 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
253 destructor Destroy; override;
254 end;
255
256 TServerSocketImpl = class( TServerTransportImpl)
257 private
258{$IFDEF OLD_SOCKETS}
259 FServer : TTcpServer;
260 FPort : Integer;
261 FClientTimeout : Integer;
262{$ELSE}
263 FServer: TServerSocket;
264{$ENDIF}
265 FUseBufferedSocket : Boolean;
266 FOwnsServer : Boolean;
267 protected
268 function Accept( const fnAccepting: TProc) : ITransport; override;
269 public
270{$IFDEF OLD_SOCKETS}
271 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
272 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
273{$ELSE}
274 constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
275 constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
276{$ENDIF}
277 destructor Destroy; override;
278 procedure Listen; override;
279 procedure Close; override;
280 end;
281
282 TBufferedTransportImpl = class( TTransportImpl )
283 private
284 FInputBuffer : IThriftStream;
285 FOutputBuffer : IThriftStream;
286 FTransport : IStreamTransport;
287 FBufSize : Integer;
288
289 procedure InitBuffers;
290 function GetUnderlyingTransport: ITransport;
291 protected
292 function GetIsOpen: Boolean; override;
293 procedure Flush; override;
294 public
295 procedure Open(); override;
296 procedure Close(); override;
297 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
298 procedure Write( const pBuf : Pointer; off, len : Integer); override;
299 constructor Create( const ATransport : IStreamTransport ); overload;
300 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
301 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
302 property IsOpen: Boolean read GetIsOpen;
303 end;
304
305 TSocketImpl = class(TStreamTransportImpl)
306 private
307{$IFDEF OLD_SOCKETS}
308 FClient : TCustomIpClient;
309{$ELSE}
310 FClient: TSocket;
311{$ENDIF}
312 FOwnsClient : Boolean;
313 FHost : string;
314 FPort : Integer;
315{$IFDEF OLD_SOCKETS}
316 FTimeout : Integer;
317{$ELSE}
318 FTimeout : Longword;
319{$ENDIF}
320
321 procedure InitSocket;
322 protected
323 function GetIsOpen: Boolean; override;
324 public
325 procedure Open; override;
326{$IFDEF OLD_SOCKETS}
327 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
328 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
329{$ELSE}
330 constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
331 constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
332{$ENDIF}
333 destructor Destroy; override;
334 procedure Close; override;
335{$IFDEF OLD_SOCKETS}
336 property TcpClient: TCustomIpClient read FClient;
337{$ELSE}
338 property TcpClient: TSocket read FClient;
339{$ENDIF}
340 property Host : string read FHost;
341 property Port: Integer read FPort;
342 end;
343
344 TFramedTransportImpl = class( TTransportImpl)
345 private const
346 FHeaderSize : Integer = 4;
347 private class var
348 FHeader_Dummy : array of Byte;
349 protected
350 FTransport : ITransport;
351 FWriteBuffer : TMemoryStream;
352 FReadBuffer : TMemoryStream;
353
354 procedure InitWriteBuffer;
355 procedure ReadFrame;
356 public
357 type
358 TFactory = class( TTransportFactoryImpl )
359 public
360 function GetTransport( const ATrans: ITransport): ITransport; override;
361 end;
362
363 {$IFDEF HAVE_CLASS_CTOR}
364 class constructor Create;
365 {$ENDIF}
366
367 constructor Create; overload;
368 constructor Create( const ATrans: ITransport); overload;
369 destructor Destroy; override;
370
371 procedure Open(); override;
372 function GetIsOpen: Boolean; override;
373
374 procedure Close(); override;
375 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
376 procedure Write( const pBuf : Pointer; off, len : Integer); override;
377 procedure Flush; override;
378 end;
379
380{$IFNDEF HAVE_CLASS_CTOR}
381procedure TFramedTransportImpl_Initialize;
382{$ENDIF}
383
384const
385 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
386 DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
387
388
389
390implementation
391
392{ TTransportImpl }
393
394procedure TTransportImpl.Flush;
395begin
396 // nothing to do
397end;
398
399function TTransportImpl.Peek: Boolean;
400begin
401 Result := IsOpen;
402end;
403
404function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
405begin
406 if Length(buf) > 0
407 then result := Read( @buf[0], Length(buf), off, len)
408 else result := 0;
409end;
410
411function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
412begin
413 if Length(buf) > 0
414 then result := ReadAll( @buf[0], Length(buf), off, len)
415 else result := 0;
416end;
417
418procedure TTransportImpl.Write( const buf: TBytes);
419begin
420 if Length(buf) > 0
421 then Write( @buf[0], 0, Length(buf));
422end;
423
424procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
425begin
426 if Length(buf) > 0
427 then Write( @buf[0], off, len);
428end;
429
430function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
431var ret : Integer;
432begin
433 result := 0;
434 while result < len do begin
435 ret := Read( pBuf, buflen, off + result, len - result);
436 if ret > 0
437 then Inc( result, ret)
438 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
439 end;
440end;
441
442procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
443begin
444 Self.Write( pBuf, 0, len);
445end;
446
447{ TTransportException }
448
449function TTransportException.GetType: TExceptionType;
450begin
451 if Self is TTransportExceptionNotOpen then Result := TExceptionType.NotOpen
452 else if Self is TTransportExceptionAlreadyOpen then Result := TExceptionType.AlreadyOpen
453 else if Self is TTransportExceptionTimedOut then Result := TExceptionType.TimedOut
454 else if Self is TTransportExceptionEndOfFile then Result := TExceptionType.EndOfFile
455 else if Self is TTransportExceptionBadArgs then Result := TExceptionType.BadArgs
456 else if Self is TTransportExceptionInterrupted then Result := TExceptionType.Interrupted
457 else Result := TExceptionType.Unknown;
458end;
459
460constructor TTransportException.HiddenCreate(const Msg: string);
461begin
462 inherited Create(Msg);
463end;
464
465class function TTransportException.Create(AType: TExceptionType): TTransportException;
466begin
467 //no inherited;
468{$WARN SYMBOL_DEPRECATED OFF}
469 Result := Create(AType, '')
470{$WARN SYMBOL_DEPRECATED DEFAULT}
471end;
472
473class function TTransportException.Create(AType: TExceptionType;
474 const msg: string): TTransportException;
475begin
476 case AType of
477 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
478 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
479 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
480 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
481 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
482 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
483 else
484 Result := TTransportExceptionUnknown.Create(msg);
485 end;
486end;
487
488class function TTransportException.Create(const msg: string): TTransportException;
489begin
490 Result := TTransportExceptionUnknown.Create(Msg);
491end;
492
493{ TTransportExceptionSpecialized }
494
495constructor TTransportExceptionSpecialized.Create(const Msg: string);
496begin
497 inherited HiddenCreate(Msg);
498end;
499
500{ TTransportFactoryImpl }
501
502function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
503begin
504 Result := ATrans;
505end;
506
507{ TServerSocket }
508
509{$IFDEF OLD_SOCKETS}
510constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
511begin
512 inherited Create;
513 FServer := AServer;
514 FClientTimeout := AClientTimeout;
515end;
516{$ELSE}
517constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
518begin
519 inherited Create;
520 FServer := AServer;
521 FServer.RecvTimeout := AClientTimeout;
522 FServer.SendTimeout := AClientTimeout;
523end;
524{$ENDIF}
525
526{$IFDEF OLD_SOCKETS}
527constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
528{$ELSE}
529constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
530{$ENDIF}
531begin
532 inherited Create;
533{$IFDEF OLD_SOCKETS}
534 FPort := APort;
535 FClientTimeout := AClientTimeout;
536 FServer := TTcpServer.Create( nil );
537 FServer.BlockMode := bmBlocking;
538 {$IF CompilerVersion >= 21.0}
539 FServer.LocalPort := AnsiString( IntToStr( FPort));
540 {$ELSE}
541 FServer.LocalPort := IntToStr( FPort);
542 {$IFEND}
543{$ELSE}
544 FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
545{$ENDIF}
546 FUseBufferedSocket := AUseBufferedSockets;
547 FOwnsServer := True;
548end;
549
550destructor TServerSocketImpl.Destroy;
551begin
552 if FOwnsServer then begin
553 FServer.Free;
554 FServer := nil;
555 end;
556 inherited;
557end;
558
559function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
560var
561{$IFDEF OLD_SOCKETS}
562 client : TCustomIpClient;
563{$ELSE}
564 client: TSocket;
565{$ENDIF}
566 trans : IStreamTransport;
567begin
568 if FServer = nil then begin
569 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
570 end;
571
572{$IFDEF OLD_SOCKETS}
573 client := nil;
574 try
575 client := TCustomIpClient.Create(nil);
576
577 if Assigned(fnAccepting)
578 then fnAccepting();
579
580 if not FServer.Accept( client) then begin
581 client.Free;
582 Result := nil;
583 Exit;
584 end;
585
586 if client = nil then begin
587 Result := nil;
588 Exit;
589 end;
590
591 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
592 client := nil; // trans owns it now
593
594 if FUseBufferedSocket
595 then result := TBufferedTransportImpl.Create( trans)
596 else result := trans;
597
598 except
599 on E: Exception do begin
600 client.Free;
601 raise TTransportExceptionUnknown.Create(E.ToString);
602 end;
603 end;
604{$ELSE}
605 if Assigned(fnAccepting) then
606 fnAccepting();
607
608 client := FServer.Accept;
609 try
610 trans := TSocketImpl.Create(client, True);
611 client := nil;
612
613 if FUseBufferedSocket then
614 Result := TBufferedTransportImpl.Create(trans)
615 else
616 Result := trans;
617 except
618 client.Free;
619 raise;
620 end;
621{$ENDIF}
622end;
623
624procedure TServerSocketImpl.Listen;
625begin
626 if FServer <> nil then
627 begin
628{$IFDEF OLD_SOCKETS}
629 try
630 FServer.Active := True;
631 except
632 on E: Exception
633 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
634 end;
635{$ELSE}
636 FServer.Listen;
637{$ENDIF}
638 end;
639end;
640
641procedure TServerSocketImpl.Close;
642begin
643 if FServer <> nil then
644{$IFDEF OLD_SOCKETS}
645 try
646 FServer.Active := False;
647 except
648 on E: Exception
649 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
650 end;
651{$ELSE}
652 FServer.Close;
653{$ENDIF}
654end;
655
656{ TSocket }
657
658{$IFDEF OLD_SOCKETS}
659constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
660var stream : IThriftStream;
661begin
662 FClient := AClient;
663 FTimeout := ATimeout;
664 FOwnsClient := aOwnsClient;
665 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
666 inherited Create( stream, stream);
667end;
668{$ELSE}
669constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
670var stream : IThriftStream;
671begin
672 FClient := AClient;
673 FTimeout := AClient.RecvTimeout;
674 FOwnsClient := aOwnsClient;
675 stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
676 inherited Create(stream, stream);
677end;
678{$ENDIF}
679
680{$IFDEF OLD_SOCKETS}
681constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
682{$ELSE}
683constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
684{$ENDIF}
685begin
686 inherited Create(nil,nil);
687 FHost := AHost;
688 FPort := APort;
689 FTimeout := ATimeout;
690 InitSocket;
691end;
692
693destructor TSocketImpl.Destroy;
694begin
695 if FOwnsClient
696 then FreeAndNil( FClient);
697 inherited;
698end;
699
700procedure TSocketImpl.Close;
701begin
702 inherited Close;
703
704 FInputStream := nil;
705 FOutputStream := nil;
706
707 if FOwnsClient
708 then FreeAndNil( FClient)
709 else FClient := nil;
710end;
711
712function TSocketImpl.GetIsOpen: Boolean;
713begin
714{$IFDEF OLD_SOCKETS}
715 Result := (FClient <> nil) and FClient.Connected;
716{$ELSE}
717 Result := (FClient <> nil) and FClient.IsOpen
718{$ENDIF}
719end;
720
721procedure TSocketImpl.InitSocket;
722var
723 stream : IThriftStream;
724begin
725 if FOwnsClient
726 then FreeAndNil( FClient)
727 else FClient := nil;
728
729{$IFDEF OLD_SOCKETS}
730 FClient := TTcpClient.Create( nil);
731{$ELSE}
732 FClient := TSocket.Create(FHost, FPort);
733{$ENDIF}
734 FOwnsClient := True;
735
736 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
737 FInputStream := stream;
738 FOutputStream := stream;
739end;
740
741procedure TSocketImpl.Open;
742begin
743 if IsOpen then begin
744 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
745 end;
746
747 if FHost = '' then begin
748 raise TTransportExceptionNotOpen.Create('Cannot open null host');
749 end;
750
751 if Port <= 0 then begin
752 raise TTransportExceptionNotOpen.Create('Cannot open without port');
753 end;
754
755 if FClient = nil
756 then InitSocket;
757
758{$IFDEF OLD_SOCKETS}
759 FClient.RemoteHost := TSocketHost( Host);
760 FClient.RemotePort := TSocketPort( IntToStr( Port));
761 FClient.Connect;
762{$ELSE}
763 FClient.Open;
764{$ENDIF}
765
766 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
767 FOutputStream := FInputStream;
768end;
769
770{ TBufferedStream }
771
772procedure TBufferedStreamImpl.Close;
773begin
774 Flush;
775 FStream := nil;
776
777 FReadBuffer.Free;
778 FReadBuffer := nil;
779
780 FWriteBuffer.Free;
781 FWriteBuffer := nil;
782end;
783
784constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
785begin
786 inherited Create;
787 FStream := AStream;
788 FBufSize := ABufSize;
789 FReadBuffer := TMemoryStream.Create;
790 FWriteBuffer := TMemoryStream.Create;
791end;
792
793destructor TBufferedStreamImpl.Destroy;
794begin
795 Close;
796 inherited;
797end;
798
799procedure TBufferedStreamImpl.Flush;
800var
801 buf : TBytes;
802 len : Integer;
803begin
804 if IsOpen then begin
805 len := FWriteBuffer.Size;
806 if len > 0 then begin
807 SetLength( buf, len );
808 FWriteBuffer.Position := 0;
809 FWriteBuffer.Read( Pointer(@buf[0])^, len );
810 FStream.Write( buf, 0, len );
811 end;
812 FWriteBuffer.Clear;
813 end;
814end;
815
816function TBufferedStreamImpl.IsOpen: Boolean;
817begin
818 Result := (FWriteBuffer <> nil)
819 and (FReadBuffer <> nil)
820 and (FStream <> nil)
821 and FStream.IsOpen;
822end;
823
824procedure TBufferedStreamImpl.Open;
825begin
826 FStream.Open;
827end;
828
829function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
830var
831 nRead : Integer;
832 tempbuf : TBytes;
833 pTmp : PByte;
834begin
835 inherited;
836 Result := 0;
837
838 if IsOpen then begin
839 while count > 0 do begin
840
841 if FReadBuffer.Position >= FReadBuffer.Size then begin
842 FReadBuffer.Clear;
843 SetLength( tempbuf, FBufSize);
844 nRead := FStream.Read( tempbuf, 0, FBufSize );
845 if nRead = 0 then Break; // avoid infinite loop
846
847 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
848 FReadBuffer.Position := 0;
849 end;
850
851 if FReadBuffer.Position < FReadBuffer.Size then begin
852 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
853 pTmp := pBuf;
854 Inc( pTmp, offset);
855 Inc( Result, FReadBuffer.Read( pTmp^, nRead));
856 Dec( count, nRead);
857 Inc( offset, nRead);
858 end;
859 end;
860 end;
861end;
862
863function TBufferedStreamImpl.ToArray: TBytes;
864var len : Integer;
865begin
866 len := 0;
867
868 if IsOpen then begin
869 len := FReadBuffer.Size;
870 end;
871
872 SetLength( Result, len);
873
874 if len > 0 then begin
875 FReadBuffer.Position := 0;
876 FReadBuffer.Read( Pointer(@Result[0])^, len );
877 end;
878end;
879
880procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
881var pTmp : PByte;
882begin
883 inherited;
884 if count > 0 then begin
885 if IsOpen then begin
886 pTmp := pBuf;
887 Inc( pTmp, offset);
888 FWriteBuffer.Write( pTmp^, count );
889 if FWriteBuffer.Size > FBufSize then begin
890 Flush;
891 end;
892 end;
893 end;
894end;
895
896{ TStreamTransportImpl }
897
898constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
899begin
900 inherited Create;
901 FInputStream := AInputStream;
902 FOutputStream := AOutputStream;
903end;
904
905destructor TStreamTransportImpl.Destroy;
906begin
907 FInputStream := nil;
908 FOutputStream := nil;
909 inherited;
910end;
911
912procedure TStreamTransportImpl.Close;
913begin
914 FInputStream := nil;
915 FOutputStream := nil;
916end;
917
918procedure TStreamTransportImpl.Flush;
919begin
920 if FOutputStream = nil then begin
921 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
922 end;
923
924 FOutputStream.Flush;
925end;
926
927function TStreamTransportImpl.GetInputStream: IThriftStream;
928begin
929 Result := FInputStream;
930end;
931
932function TStreamTransportImpl.GetIsOpen: Boolean;
933begin
934 Result := True;
935end;
936
937function TStreamTransportImpl.GetOutputStream: IThriftStream;
938begin
939 Result := FOutputStream;
940end;
941
942procedure TStreamTransportImpl.Open;
943begin
944
945end;
946
947function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
948begin
949 if FInputStream = nil then begin
950 raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
951 end;
952
953 Result := FInputStream.Read( pBuf,buflen, off, len );
954end;
955
956procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
957begin
958 if FOutputStream = nil then begin
959 raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
960 end;
961
962 FOutputStream.Write( pBuf, off, len );
963end;
964
965{ TBufferedTransportImpl }
966
967constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
968begin
969 //no inherited;
970 Create( ATransport, 1024 );
971end;
972
973constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
974begin
975 inherited Create;
976 FTransport := ATransport;
977 FBufSize := ABufSize;
978 InitBuffers;
979end;
980
981procedure TBufferedTransportImpl.Close;
982begin
983 FTransport.Close;
984 FInputBuffer := nil;
985 FOutputBuffer := nil;
986end;
987
988procedure TBufferedTransportImpl.Flush;
989begin
990 if FOutputBuffer <> nil then begin
991 FOutputBuffer.Flush;
992 end;
993end;
994
995function TBufferedTransportImpl.GetIsOpen: Boolean;
996begin
997 Result := FTransport.IsOpen;
998end;
999
1000function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1001begin
1002 Result := FTransport;
1003end;
1004
1005procedure TBufferedTransportImpl.InitBuffers;
1006begin
1007 if FTransport.InputStream <> nil then begin
1008 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1009 end;
1010 if FTransport.OutputStream <> nil then begin
1011 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1012 end;
1013end;
1014
1015procedure TBufferedTransportImpl.Open;
1016begin
1017 FTransport.Open;
1018 InitBuffers; // we need to get the buffers to match FTransport substreams again
1019end;
1020
1021function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
1022begin
1023 Result := 0;
1024 if FInputBuffer <> nil then begin
1025 Result := FInputBuffer.Read( pBuf,buflen, off, len );
1026 end;
1027end;
1028
1029procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
1030begin
1031 if FOutputBuffer <> nil then begin
1032 FOutputBuffer.Write( pBuf, off, len );
1033 end;
1034end;
1035
1036{ TFramedTransportImpl }
1037
1038{$IFDEF HAVE_CLASS_CTOR}
1039class constructor TFramedTransportImpl.Create;
1040begin
1041 SetLength( FHeader_Dummy, FHeaderSize);
1042 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1043end;
1044{$ELSE}
1045procedure TFramedTransportImpl_Initialize;
1046begin
1047 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1048 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1049 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1050end;
1051{$ENDIF}
1052
1053constructor TFramedTransportImpl.Create;
1054begin
1055 inherited Create;
1056 InitWriteBuffer;
1057end;
1058
1059procedure TFramedTransportImpl.Close;
1060begin
1061 FTransport.Close;
1062end;
1063
1064constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1065begin
1066 inherited Create;
1067 InitWriteBuffer;
1068 FTransport := ATrans;
1069end;
1070
1071destructor TFramedTransportImpl.Destroy;
1072begin
1073 FWriteBuffer.Free;
1074 FReadBuffer.Free;
1075 inherited;
1076end;
1077
1078procedure TFramedTransportImpl.Flush;
1079var
1080 buf : TBytes;
1081 len : Integer;
1082 data_len : Integer;
1083
1084begin
1085 len := FWriteBuffer.Size;
1086 SetLength( buf, len);
1087 if len > 0 then begin
1088 System.Move( FWriteBuffer.Memory^, buf[0], len );
1089 end;
1090
1091 data_len := len - FHeaderSize;
1092 if (data_len < 0) then begin
1093 raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
1094 end;
1095
1096 InitWriteBuffer;
1097
1098 buf[0] := Byte($FF and (data_len shr 24));
1099 buf[1] := Byte($FF and (data_len shr 16));
1100 buf[2] := Byte($FF and (data_len shr 8));
1101 buf[3] := Byte($FF and data_len);
1102
1103 FTransport.Write( buf, 0, len );
1104 FTransport.Flush;
1105end;
1106
1107function TFramedTransportImpl.GetIsOpen: Boolean;
1108begin
1109 Result := FTransport.IsOpen;
1110end;
1111
1112type
1113 TAccessMemoryStream = class(TMemoryStream)
1114 end;
1115
1116procedure TFramedTransportImpl.InitWriteBuffer;
1117begin
1118 FWriteBuffer.Free;
1119 FWriteBuffer := TMemoryStream.Create;
1120 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1121 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1122end;
1123
1124procedure TFramedTransportImpl.Open;
1125begin
1126 FTransport.Open;
1127end;
1128
1129function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
1130var pTmp : PByte;
1131begin
1132 if len > (buflen-off)
1133 then len := buflen-off;
1134
1135 pTmp := pBuf;
1136 Inc( pTmp, off);
1137
1138 if (FReadBuffer <> nil) and (len > 0) then begin
1139 result := FReadBuffer.Read( pTmp^, len);
1140 if result > 0 then begin
1141 Exit;
1142 end;
1143 end;
1144
1145 ReadFrame;
1146 if len > 0
1147 then Result := FReadBuffer.Read( pTmp^, len)
1148 else Result := 0;
1149end;
1150
1151procedure TFramedTransportImpl.ReadFrame;
1152var
1153 i32rd : TBytes;
1154 size : Integer;
1155 buff : TBytes;
1156begin
1157 SetLength( i32rd, FHeaderSize );
1158 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1159 size :=
1160 ((i32rd[0] and $FF) shl 24) or
1161 ((i32rd[1] and $FF) shl 16) or
1162 ((i32rd[2] and $FF) shl 8) or
1163 (i32rd[3] and $FF);
1164 SetLength( buff, size );
1165 FTransport.ReadAll( buff, 0, size );
1166 FReadBuffer.Free;
1167 FReadBuffer := TMemoryStream.Create;
1168 if Length(buff) > 0
1169 then FReadBuffer.Write( Pointer(@buff[0])^, size );
1170 FReadBuffer.Position := 0;
1171end;
1172
1173procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
1174var pTmp : PByte;
1175begin
1176 if len > 0 then begin
1177 pTmp := pBuf;
1178 Inc( pTmp, off);
1179
1180 FWriteBuffer.Write( pTmp^, len );
1181 end;
1182end;
1183
1184{ TFramedTransport.TFactory }
1185
1186function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1187begin
1188 Result := TFramedTransportImpl.Create( ATrans );
1189end;
1190
1191{ TTcpSocketStreamImpl }
1192
1193procedure TTcpSocketStreamImpl.Close;
1194begin
1195 FTcpClient.Close;
1196end;
1197
1198{$IFDEF OLD_SOCKETS}
1199constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
1200begin
1201 inherited Create;
1202 FTcpClient := ATcpClient;
1203 FTimeout := aTimeout;
1204end;
1205{$ELSE}
1206constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
1207begin
1208 inherited Create;
1209 FTcpClient := ATcpClient;
1210 if aTimeout = 0 then
1211 FTcpClient.RecvTimeout := SLEEP_TIME
1212 else
1213 FTcpClient.RecvTimeout := aTimeout;
1214 FTcpClient.SendTimeout := aTimeout;
1215end;
1216{$ENDIF}
1217
1218procedure TTcpSocketStreamImpl.Flush;
1219begin
1220
1221end;
1222
1223function TTcpSocketStreamImpl.IsOpen: Boolean;
1224begin
1225{$IFDEF OLD_SOCKETS}
1226 Result := FTcpClient.Active;
1227{$ELSE}
1228 Result := FTcpClient.IsOpen;
1229{$ENDIF}
1230end;
1231
1232procedure TTcpSocketStreamImpl.Open;
1233begin
1234 FTcpClient.Open;
1235end;
1236
1237
1238{$IFDEF OLD_SOCKETS}
1239function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1240 TimeOut: Integer; var wsaError : Integer): Integer;
1241var
1242 ReadFds: TFDset;
1243 ReadFdsptr: PFDset;
1244 WriteFds: TFDset;
1245 WriteFdsptr: PFDset;
1246 ExceptFds: TFDset;
1247 ExceptFdsptr: PFDset;
1248 tv: timeval;
1249 Timeptr: PTimeval;
1250 socket : TSocket;
1251begin
1252 if not FTcpClient.Active then begin
1253 wsaError := WSAEINVAL;
1254 Exit( SOCKET_ERROR);
1255 end;
1256
1257 socket := FTcpClient.Handle;
1258
1259 if Assigned(ReadReady) then begin
1260 ReadFdsptr := @ReadFds;
1261 FD_ZERO(ReadFds);
1262 FD_SET(socket, ReadFds);
1263 end
1264 else begin
1265 ReadFdsptr := nil;
1266 end;
1267
1268 if Assigned(WriteReady) then begin
1269 WriteFdsptr := @WriteFds;
1270 FD_ZERO(WriteFds);
1271 FD_SET(socket, WriteFds);
1272 end
1273 else begin
1274 WriteFdsptr := nil;
1275 end;
1276
1277 if Assigned(ExceptFlag) then begin
1278 ExceptFdsptr := @ExceptFds;
1279 FD_ZERO(ExceptFds);
1280 FD_SET(socket, ExceptFds);
1281 end
1282 else begin
1283 ExceptFdsptr := nil;
1284 end;
1285
1286 if TimeOut >= 0 then begin
1287 tv.tv_sec := TimeOut div 1000;
1288 tv.tv_usec := 1000 * (TimeOut mod 1000);
1289 Timeptr := @tv;
1290 end
1291 else begin
1292 Timeptr := nil; // wait forever
1293 end;
1294
1295 wsaError := 0;
1296 try
1297 {$IFDEF MSWINDOWS}
1298 {$IFDEF OLD_UNIT_NAMES}
1299 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1300 {$ELSE}
1301 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1302 {$ENDIF}
1303 {$ENDIF}
1304 {$IFDEF LINUX}
1305 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1306 {$ENDIF}
1307
1308 if result = SOCKET_ERROR
1309 then wsaError := WSAGetLastError;
1310
1311 except
1312 result := SOCKET_ERROR;
1313 end;
1314
1315 if Assigned(ReadReady) then
1316 ReadReady^ := FD_ISSET(socket, ReadFds);
1317
1318 if Assigned(WriteReady) then
1319 WriteReady^ := FD_ISSET(socket, WriteFds);
1320
1321 if Assigned(ExceptFlag) then
1322 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1323end;
1324{$ENDIF}
1325
1326{$IFDEF OLD_SOCKETS}
1327function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1328 DesiredBytes : Integer;
1329 var wsaError, bytesReady : Integer): TWaitForData;
1330var bCanRead, bError : Boolean;
1331 retval : Integer;
1332const
1333 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
1334begin
1335 bytesReady := 0;
1336
1337 // The select function returns the total number of socket handles that are ready
1338 // and contained in the fd_set structures, zero if the time limit expired,
1339 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1340 // WSAGetLastError can be used to retrieve a specific error code.
1341 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1342 if retval = SOCKET_ERROR
1343 then Exit( TWaitForData.wfd_Error);
1344 if (retval = 0) or not bCanRead
1345 then Exit( TWaitForData.wfd_Timeout);
1346
1347 // recv() returns the number of bytes received, or -1 if an error occurred.
1348 // The return value will be 0 when the peer has performed an orderly shutdown.
1349
1350 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
1351 if retval <= 0
1352 then Exit( TWaitForData.wfd_Error);
1353
1354 // at least we have some data
1355 bytesReady := Min( retval, DesiredBytes);
1356 result := TWaitForData.wfd_HaveData;
1357end;
1358{$ENDIF}
1359
1360{$IFDEF OLD_SOCKETS}
1361function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
1362// old sockets version
1363var wfd : TWaitForData;
1364 wsaError,
1365 msecs : Integer;
1366 nBytes : Integer;
1367 pTmp : PByte;
1368begin
1369 inherited;
1370
1371 if FTimeout > 0
1372 then msecs := FTimeout
1373 else msecs := DEFAULT_THRIFT_TIMEOUT;
1374
1375 result := 0;
1376 pTmp := pBuf;
1377 Inc( pTmp, offset);
1378 while count > 0 do begin
1379
1380 while TRUE do begin
1381 wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
1382 case wfd of
1383 TWaitForData.wfd_Error : Exit;
1384 TWaitForData.wfd_HaveData : Break;
1385 TWaitForData.wfd_Timeout : begin
1386 if (FTimeout = 0)
1387 then Exit
1388 else begin
1389 raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
1390
1391 end;
1392 end;
1393 else
1394 ASSERT( FALSE);
1395 end;
1396 end;
1397
1398 // reduce the timeout once we got data
1399 if FTimeout > 0
1400 then msecs := FTimeout div 10
1401 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1402 msecs := Max( msecs, 200);
1403
1404 ASSERT( nBytes <= count);
1405 nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1406 Inc( pTmp, nBytes);
1407 Dec( count, nBytes);
1408 Inc( result, nBytes);
1409 end;
1410end;
1411
1412function TTcpSocketStreamImpl.ToArray: TBytes;
1413// old sockets version
1414var len : Integer;
1415begin
1416 len := 0;
1417 if IsOpen then begin
1418 len := FTcpClient.BytesReceived;
1419 end;
1420
1421 SetLength( Result, len );
1422
1423 if len > 0 then begin
1424 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1425 end;
1426end;
1427
1428procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
1429// old sockets version
1430var bCanWrite, bError : Boolean;
1431 retval, wsaError : Integer;
1432 pTmp : PByte;
1433begin
1434 inherited;
1435
1436 if not FTcpClient.Active
1437 then raise TTransportExceptionNotOpen.Create('not open');
1438
1439 // The select function returns the total number of socket handles that are ready
1440 // and contained in the fd_set structures, zero if the time limit expired,
1441 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1442 // WSAGetLastError can be used to retrieve a specific error code.
1443 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1444 if retval = SOCKET_ERROR
1445 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
1446
1447 if (retval = 0)
1448 then raise TTransportExceptionTimedOut.Create('timed out');
1449
1450 if bError or not bCanWrite
1451 then raise TTransportExceptionUnknown.Create('unknown error');
1452
1453 pTmp := pBuf;
1454 Inc( pTmp, offset);
1455 FTcpClient.SendBuf( pTmp^, count);
1456end;
1457
1458{$ELSE}
1459
1460function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
1461// new sockets version
1462var nBytes : Integer;
1463 pTmp : PByte;
1464begin
1465 inherited;
1466
1467 result := 0;
1468 pTmp := pBuf;
1469 Inc( pTmp, offset);
1470 while count > 0 do begin
1471 nBytes := FTcpClient.Read( pTmp^, count);
1472 if nBytes = 0 then Exit;
1473 Inc( pTmp, nBytes);
1474 Dec( count, nBytes);
1475 Inc( result, nBytes);
1476 end;
1477end;
1478
1479function TTcpSocketStreamImpl.ToArray: TBytes;
1480// new sockets version
1481var len : Integer;
1482begin
1483 len := 0;
1484 try
1485 if FTcpClient.Peek then
1486 repeat
1487 SetLength(Result, Length(Result) + 1024);
1488 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1489 until len < 1024;
1490 except
1491 on TTransportException do begin { don't allow default exceptions } end;
1492 else raise;
1493 end;
1494 if len > 0 then
1495 SetLength(Result, Length(Result) - 1024 + len);
1496end;
1497
1498procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
1499// new sockets version
1500var pTmp : PByte;
1501begin
1502 inherited;
1503
1504 if not FTcpClient.IsOpen
1505 then raise TTransportExceptionNotOpen.Create('not open');
1506
1507 pTmp := pBuf;
1508 Inc( pTmp, offset);
1509 FTcpClient.Write( pTmp^, count);
1510end;
1511
1512{$ENDIF}
1513
1514
1515{$IF CompilerVersion < 21.0}
1516initialization
1517begin
1518 TFramedTransportImpl_Initialize;
1519end;
1520{$IFEND}
1521
1522
1523end.