]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | (* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | *) | |
19 | unit Thrift.Transport; | |
20 | ||
21 | {$I Thrift.Defines.inc} | |
22 | {$SCOPEDENUMS ON} | |
23 | ||
24 | interface | |
25 | ||
26 | uses | |
27 | Classes, | |
28 | SysUtils, | |
29 | Math, | |
30 | Generics.Collections, | |
31 | {$IFDEF OLD_UNIT_NAMES} | |
32 | WinSock, Sockets, | |
33 | {$ELSE} | |
34 | Winapi.WinSock, | |
35 | {$IFDEF OLD_SOCKETS} | |
36 | Web.Win.Sockets, | |
37 | {$ELSE} | |
38 | Thrift.Socket, | |
39 | {$ENDIF} | |
40 | {$ENDIF} | |
41 | Thrift.Collections, | |
42 | Thrift.Exception, | |
43 | Thrift.Utils, | |
44 | Thrift.WinHTTP, | |
45 | Thrift.Stream; | |
46 | ||
47 | type | |
48 | ITransport = interface | |
49 | ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}'] | |
50 | function GetIsOpen: Boolean; | |
51 | property IsOpen: Boolean read GetIsOpen; | |
52 | function Peek: Boolean; | |
53 | procedure Open; | |
54 | procedure Close; | |
55 | function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; | |
56 | function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; | |
57 | function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; | |
58 | function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; | |
59 | procedure Write( const buf: TBytes); overload; | |
60 | procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; | |
61 | procedure Write( const pBuf : Pointer; off, len : Integer); overload; | |
62 | procedure Write( const pBuf : Pointer; len : Integer); overload; | |
63 | procedure Flush; | |
64 | end; | |
65 | ||
66 | TTransportImpl = class( TInterfacedObject, ITransport) | |
67 | protected | |
68 | function GetIsOpen: Boolean; virtual; abstract; | |
69 | property IsOpen: Boolean read GetIsOpen; | |
70 | function Peek: Boolean; virtual; | |
71 | procedure Open(); virtual; abstract; | |
72 | procedure Close(); virtual; abstract; | |
73 | function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline; | |
74 | function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract; | |
75 | function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline; | |
76 | function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; | |
77 | procedure Write( const buf: TBytes); overload; inline; | |
78 | procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline; | |
79 | procedure Write( const pBuf : Pointer; len : Integer); overload; inline; | |
80 | procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract; | |
81 | procedure Flush; virtual; | |
82 | end; | |
83 | ||
84 | TTransportException = class( TException) | |
85 | public | |
86 | type | |
87 | TExceptionType = ( | |
88 | Unknown, | |
89 | NotOpen, | |
90 | AlreadyOpen, | |
91 | TimedOut, | |
92 | EndOfFile, | |
93 | BadArgs, | |
94 | Interrupted | |
95 | ); | |
96 | private | |
97 | function GetType: TExceptionType; | |
98 | protected | |
99 | constructor HiddenCreate(const Msg: string); | |
100 | public | |
101 | class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)'; | |
102 | class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)'; | |
103 | class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)'; | |
104 | property Type_: TExceptionType read GetType; | |
105 | end; | |
106 | ||
107 | // Needed to remove deprecation warning | |
108 | TTransportExceptionSpecialized = class abstract (TTransportException) | |
109 | public | |
110 | constructor Create(const Msg: string); | |
111 | end; | |
112 | ||
113 | TTransportExceptionUnknown = class (TTransportExceptionSpecialized); | |
114 | TTransportExceptionNotOpen = class (TTransportExceptionSpecialized); | |
115 | TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized); | |
116 | TTransportExceptionTimedOut = class (TTransportExceptionSpecialized); | |
117 | TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized); | |
118 | TTransportExceptionBadArgs = class (TTransportExceptionSpecialized); | |
119 | TTransportExceptionInterrupted = class (TTransportExceptionSpecialized); | |
120 | ||
121 | TSecureProtocol = ( | |
122 | SSL_2, SSL_3, TLS_1, // outdated, for compatibilty only | |
123 | TLS_1_1, TLS_1_2 // secure (as of today) | |
124 | ); | |
125 | ||
126 | TSecureProtocols = set of TSecureProtocol; | |
127 | ||
128 | IHTTPClient = interface( ITransport ) | |
129 | ['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}'] | |
130 | procedure SetDnsResolveTimeout(const Value: Integer); | |
131 | function GetDnsResolveTimeout: Integer; | |
132 | procedure SetConnectionTimeout(const Value: Integer); | |
133 | function GetConnectionTimeout: Integer; | |
134 | procedure SetSendTimeout(const Value: Integer); | |
135 | function GetSendTimeout: Integer; | |
136 | procedure SetReadTimeout(const Value: Integer); | |
137 | function GetReadTimeout: Integer; | |
138 | function GetCustomHeaders: IThriftDictionary<string,string>; | |
139 | procedure SendRequest; | |
140 | function GetSecureProtocols : TSecureProtocols; | |
141 | procedure SetSecureProtocols( const value : TSecureProtocols); | |
142 | ||
143 | property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout; | |
144 | property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout; | |
145 | property SendTimeout: Integer read GetSendTimeout write SetSendTimeout; | |
146 | property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout; | |
147 | property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders; | |
148 | property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols; | |
149 | end; | |
150 | ||
151 | IServerTransport = interface | |
152 | ['{C43B87ED-69EA-47C4-B77C-15E288252900}'] | |
153 | procedure Listen; | |
154 | procedure Close; | |
155 | function Accept( const fnAccepting: TProc): ITransport; | |
156 | end; | |
157 | ||
158 | TServerTransportImpl = class( TInterfacedObject, IServerTransport) | |
159 | protected | |
160 | procedure Listen; virtual; abstract; | |
161 | procedure Close; virtual; abstract; | |
162 | function Accept( const fnAccepting: TProc): ITransport; virtual; abstract; | |
163 | end; | |
164 | ||
165 | ITransportFactory = interface | |
166 | ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}'] | |
167 | function GetTransport( const ATrans: ITransport): ITransport; | |
168 | end; | |
169 | ||
170 | TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory) | |
171 | function GetTransport( const ATrans: ITransport): ITransport; virtual; | |
172 | end; | |
173 | ||
174 | TTcpSocketStreamImpl = class( TThriftStreamImpl ) | |
175 | {$IFDEF OLD_SOCKETS} | |
176 | private type | |
177 | TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error); | |
178 | private | |
179 | FTcpClient : TCustomIpClient; | |
180 | FTimeout : Integer; | |
181 | function Select( ReadReady, WriteReady, ExceptFlag: PBoolean; | |
182 | TimeOut: Integer; var wsaError : Integer): Integer; | |
183 | function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer; | |
184 | var wsaError, bytesReady : Integer): TWaitForData; | |
185 | {$ELSE} | |
186 | FTcpClient: TSocket; | |
187 | protected const | |
188 | SLEEP_TIME = 200; | |
189 | {$ENDIF} | |
190 | protected | |
191 | procedure Write( const pBuf : Pointer; offset, count: Integer); override; | |
192 | function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override; | |
193 | procedure Open; override; | |
194 | procedure Close; override; | |
195 | procedure Flush; override; | |
196 | ||
197 | function IsOpen: Boolean; override; | |
198 | function ToArray: TBytes; override; | |
199 | public | |
200 | {$IFDEF OLD_SOCKETS} | |
201 | constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0); | |
202 | {$ELSE} | |
203 | constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0); | |
204 | {$ENDIF} | |
205 | end; | |
206 | ||
207 | IStreamTransport = interface( ITransport ) | |
208 | ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}'] | |
209 | function GetInputStream: IThriftStream; | |
210 | function GetOutputStream: IThriftStream; | |
211 | property InputStream : IThriftStream read GetInputStream; | |
212 | property OutputStream : IThriftStream read GetOutputStream; | |
213 | end; | |
214 | ||
215 | TStreamTransportImpl = class( TTransportImpl, IStreamTransport) | |
216 | protected | |
217 | FInputStream : IThriftStream; | |
218 | FOutputStream : IThriftStream; | |
219 | protected | |
220 | function GetIsOpen: Boolean; override; | |
221 | ||
222 | function GetInputStream: IThriftStream; | |
223 | function GetOutputStream: IThriftStream; | |
224 | public | |
225 | property InputStream : IThriftStream read GetInputStream; | |
226 | property OutputStream : IThriftStream read GetOutputStream; | |
227 | ||
228 | procedure Open; override; | |
229 | procedure Close; override; | |
230 | procedure Flush; override; | |
231 | function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override; | |
232 | procedure Write( const pBuf : Pointer; off, len : Integer); override; | |
233 | constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream); | |
234 | destructor Destroy; override; | |
235 | end; | |
236 | ||
237 | TBufferedStreamImpl = class( TThriftStreamImpl) | |
238 | private | |
239 | FStream : IThriftStream; | |
240 | FBufSize : Integer; | |
241 | FReadBuffer : TMemoryStream; | |
242 | FWriteBuffer : TMemoryStream; | |
243 | protected | |
244 | procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override; | |
245 | function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override; | |
246 | procedure Open; override; | |
247 | procedure Close; override; | |
248 | procedure Flush; override; | |
249 | function IsOpen: Boolean; override; | |
250 | function ToArray: TBytes; override; | |
251 | public | |
252 | constructor Create( const AStream: IThriftStream; ABufSize: Integer); | |
253 | destructor Destroy; override; | |
254 | end; | |
255 | ||
256 | TServerSocketImpl = class( TServerTransportImpl) | |
257 | private | |
258 | {$IFDEF OLD_SOCKETS} | |
259 | FServer : TTcpServer; | |
260 | FPort : Integer; | |
261 | FClientTimeout : Integer; | |
262 | {$ELSE} | |
263 | FServer: TServerSocket; | |
264 | {$ENDIF} | |
265 | FUseBufferedSocket : Boolean; | |
266 | FOwnsServer : Boolean; | |
267 | protected | |
268 | function Accept( const fnAccepting: TProc) : ITransport; override; | |
269 | public | |
270 | {$IFDEF OLD_SOCKETS} | |
271 | constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload; | |
272 | constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload; | |
273 | {$ELSE} | |
274 | constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload; | |
275 | constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload; | |
276 | {$ENDIF} | |
277 | destructor Destroy; override; | |
278 | procedure Listen; override; | |
279 | procedure Close; override; | |
280 | end; | |
281 | ||
282 | TBufferedTransportImpl = class( TTransportImpl ) | |
283 | private | |
284 | FInputBuffer : IThriftStream; | |
285 | FOutputBuffer : IThriftStream; | |
286 | FTransport : IStreamTransport; | |
287 | FBufSize : Integer; | |
288 | ||
289 | procedure InitBuffers; | |
290 | function GetUnderlyingTransport: ITransport; | |
291 | protected | |
292 | function GetIsOpen: Boolean; override; | |
293 | procedure Flush; override; | |
294 | public | |
295 | procedure Open(); override; | |
296 | procedure Close(); override; | |
297 | function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override; | |
298 | procedure Write( const pBuf : Pointer; off, len : Integer); override; | |
299 | constructor Create( const ATransport : IStreamTransport ); overload; | |
300 | constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload; | |
301 | property UnderlyingTransport: ITransport read GetUnderlyingTransport; | |
302 | property IsOpen: Boolean read GetIsOpen; | |
303 | end; | |
304 | ||
305 | TSocketImpl = class(TStreamTransportImpl) | |
306 | private | |
307 | {$IFDEF OLD_SOCKETS} | |
308 | FClient : TCustomIpClient; | |
309 | {$ELSE} | |
310 | FClient: TSocket; | |
311 | {$ENDIF} | |
312 | FOwnsClient : Boolean; | |
313 | FHost : string; | |
314 | FPort : Integer; | |
315 | {$IFDEF OLD_SOCKETS} | |
316 | FTimeout : Integer; | |
317 | {$ELSE} | |
318 | FTimeout : Longword; | |
319 | {$ENDIF} | |
320 | ||
321 | procedure InitSocket; | |
322 | protected | |
323 | function GetIsOpen: Boolean; override; | |
324 | public | |
325 | procedure Open; override; | |
326 | {$IFDEF OLD_SOCKETS} | |
327 | constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload; | |
328 | constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload; | |
329 | {$ELSE} | |
330 | constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload; | |
331 | constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload; | |
332 | {$ENDIF} | |
333 | destructor Destroy; override; | |
334 | procedure Close; override; | |
335 | {$IFDEF OLD_SOCKETS} | |
336 | property TcpClient: TCustomIpClient read FClient; | |
337 | {$ELSE} | |
338 | property TcpClient: TSocket read FClient; | |
339 | {$ENDIF} | |
340 | property Host : string read FHost; | |
341 | property Port: Integer read FPort; | |
342 | end; | |
343 | ||
344 | TFramedTransportImpl = class( TTransportImpl) | |
345 | private const | |
346 | FHeaderSize : Integer = 4; | |
347 | private class var | |
348 | FHeader_Dummy : array of Byte; | |
349 | protected | |
350 | FTransport : ITransport; | |
351 | FWriteBuffer : TMemoryStream; | |
352 | FReadBuffer : TMemoryStream; | |
353 | ||
354 | procedure InitWriteBuffer; | |
355 | procedure ReadFrame; | |
356 | public | |
357 | type | |
358 | TFactory = class( TTransportFactoryImpl ) | |
359 | public | |
360 | function GetTransport( const ATrans: ITransport): ITransport; override; | |
361 | end; | |
362 | ||
363 | {$IFDEF HAVE_CLASS_CTOR} | |
364 | class constructor Create; | |
365 | {$ENDIF} | |
366 | ||
367 | constructor Create; overload; | |
368 | constructor Create( const ATrans: ITransport); overload; | |
369 | destructor Destroy; override; | |
370 | ||
371 | procedure Open(); override; | |
372 | function GetIsOpen: Boolean; override; | |
373 | ||
374 | procedure Close(); override; | |
375 | function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override; | |
376 | procedure Write( const pBuf : Pointer; off, len : Integer); override; | |
377 | procedure Flush; override; | |
378 | end; | |
379 | ||
380 | {$IFNDEF HAVE_CLASS_CTOR} | |
381 | procedure TFramedTransportImpl_Initialize; | |
382 | {$ENDIF} | |
383 | ||
384 | const | |
385 | DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms | |
386 | DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2]; | |
387 | ||
388 | ||
389 | ||
390 | implementation | |
391 | ||
392 | { TTransportImpl } | |
393 | ||
394 | procedure TTransportImpl.Flush; | |
395 | begin | |
396 | // nothing to do | |
397 | end; | |
398 | ||
399 | function TTransportImpl.Peek: Boolean; | |
400 | begin | |
401 | Result := IsOpen; | |
402 | end; | |
403 | ||
404 | function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer; | |
405 | begin | |
406 | if Length(buf) > 0 | |
407 | then result := Read( @buf[0], Length(buf), off, len) | |
408 | else result := 0; | |
409 | end; | |
410 | ||
411 | function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; | |
412 | begin | |
413 | if Length(buf) > 0 | |
414 | then result := ReadAll( @buf[0], Length(buf), off, len) | |
415 | else result := 0; | |
416 | end; | |
417 | ||
418 | procedure TTransportImpl.Write( const buf: TBytes); | |
419 | begin | |
420 | if Length(buf) > 0 | |
421 | then Write( @buf[0], 0, Length(buf)); | |
422 | end; | |
423 | ||
424 | procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer); | |
425 | begin | |
426 | if Length(buf) > 0 | |
427 | then Write( @buf[0], off, len); | |
428 | end; | |
429 | ||
430 | function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; | |
431 | var ret : Integer; | |
432 | begin | |
433 | result := 0; | |
434 | while result < len do begin | |
435 | ret := Read( pBuf, buflen, off + result, len - result); | |
436 | if ret > 0 | |
437 | then Inc( result, ret) | |
438 | else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' ); | |
439 | end; | |
440 | end; | |
441 | ||
442 | procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer); | |
443 | begin | |
444 | Self.Write( pBuf, 0, len); | |
445 | end; | |
446 | ||
447 | { TTransportException } | |
448 | ||
449 | function TTransportException.GetType: TExceptionType; | |
450 | begin | |
451 | if Self is TTransportExceptionNotOpen then Result := TExceptionType.NotOpen | |
452 | else if Self is TTransportExceptionAlreadyOpen then Result := TExceptionType.AlreadyOpen | |
453 | else if Self is TTransportExceptionTimedOut then Result := TExceptionType.TimedOut | |
454 | else if Self is TTransportExceptionEndOfFile then Result := TExceptionType.EndOfFile | |
455 | else if Self is TTransportExceptionBadArgs then Result := TExceptionType.BadArgs | |
456 | else if Self is TTransportExceptionInterrupted then Result := TExceptionType.Interrupted | |
457 | else Result := TExceptionType.Unknown; | |
458 | end; | |
459 | ||
460 | constructor TTransportException.HiddenCreate(const Msg: string); | |
461 | begin | |
462 | inherited Create(Msg); | |
463 | end; | |
464 | ||
465 | class function TTransportException.Create(AType: TExceptionType): TTransportException; | |
466 | begin | |
467 | //no inherited; | |
468 | {$WARN SYMBOL_DEPRECATED OFF} | |
469 | Result := Create(AType, '') | |
470 | {$WARN SYMBOL_DEPRECATED DEFAULT} | |
471 | end; | |
472 | ||
473 | class function TTransportException.Create(AType: TExceptionType; | |
474 | const msg: string): TTransportException; | |
475 | begin | |
476 | case AType of | |
477 | TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg); | |
478 | TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg); | |
479 | TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg); | |
480 | TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg); | |
481 | TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg); | |
482 | TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg); | |
483 | else | |
484 | Result := TTransportExceptionUnknown.Create(msg); | |
485 | end; | |
486 | end; | |
487 | ||
488 | class function TTransportException.Create(const msg: string): TTransportException; | |
489 | begin | |
490 | Result := TTransportExceptionUnknown.Create(Msg); | |
491 | end; | |
492 | ||
493 | { TTransportExceptionSpecialized } | |
494 | ||
495 | constructor TTransportExceptionSpecialized.Create(const Msg: string); | |
496 | begin | |
497 | inherited HiddenCreate(Msg); | |
498 | end; | |
499 | ||
500 | { TTransportFactoryImpl } | |
501 | ||
502 | function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport; | |
503 | begin | |
504 | Result := ATrans; | |
505 | end; | |
506 | ||
507 | { TServerSocket } | |
508 | ||
509 | {$IFDEF OLD_SOCKETS} | |
510 | constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer); | |
511 | begin | |
512 | inherited Create; | |
513 | FServer := AServer; | |
514 | FClientTimeout := AClientTimeout; | |
515 | end; | |
516 | {$ELSE} | |
517 | constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword); | |
518 | begin | |
519 | inherited Create; | |
520 | FServer := AServer; | |
521 | FServer.RecvTimeout := AClientTimeout; | |
522 | FServer.SendTimeout := AClientTimeout; | |
523 | end; | |
524 | {$ENDIF} | |
525 | ||
526 | {$IFDEF OLD_SOCKETS} | |
527 | constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean); | |
528 | {$ELSE} | |
529 | constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean); | |
530 | {$ENDIF} | |
531 | begin | |
532 | inherited Create; | |
533 | {$IFDEF OLD_SOCKETS} | |
534 | FPort := APort; | |
535 | FClientTimeout := AClientTimeout; | |
536 | FServer := TTcpServer.Create( nil ); | |
537 | FServer.BlockMode := bmBlocking; | |
538 | {$IF CompilerVersion >= 21.0} | |
539 | FServer.LocalPort := AnsiString( IntToStr( FPort)); | |
540 | {$ELSE} | |
541 | FServer.LocalPort := IntToStr( FPort); | |
542 | {$IFEND} | |
543 | {$ELSE} | |
544 | FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout); | |
545 | {$ENDIF} | |
546 | FUseBufferedSocket := AUseBufferedSockets; | |
547 | FOwnsServer := True; | |
548 | end; | |
549 | ||
550 | destructor TServerSocketImpl.Destroy; | |
551 | begin | |
552 | if FOwnsServer then begin | |
553 | FServer.Free; | |
554 | FServer := nil; | |
555 | end; | |
556 | inherited; | |
557 | end; | |
558 | ||
559 | function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport; | |
560 | var | |
561 | {$IFDEF OLD_SOCKETS} | |
562 | client : TCustomIpClient; | |
563 | {$ELSE} | |
564 | client: TSocket; | |
565 | {$ENDIF} | |
566 | trans : IStreamTransport; | |
567 | begin | |
568 | if FServer = nil then begin | |
569 | raise TTransportExceptionNotOpen.Create('No underlying server socket.'); | |
570 | end; | |
571 | ||
572 | {$IFDEF OLD_SOCKETS} | |
573 | client := nil; | |
574 | try | |
575 | client := TCustomIpClient.Create(nil); | |
576 | ||
577 | if Assigned(fnAccepting) | |
578 | then fnAccepting(); | |
579 | ||
580 | if not FServer.Accept( client) then begin | |
581 | client.Free; | |
582 | Result := nil; | |
583 | Exit; | |
584 | end; | |
585 | ||
586 | if client = nil then begin | |
587 | Result := nil; | |
588 | Exit; | |
589 | end; | |
590 | ||
591 | trans := TSocketImpl.Create( client, TRUE, FClientTimeout); | |
592 | client := nil; // trans owns it now | |
593 | ||
594 | if FUseBufferedSocket | |
595 | then result := TBufferedTransportImpl.Create( trans) | |
596 | else result := trans; | |
597 | ||
598 | except | |
599 | on E: Exception do begin | |
600 | client.Free; | |
601 | raise TTransportExceptionUnknown.Create(E.ToString); | |
602 | end; | |
603 | end; | |
604 | {$ELSE} | |
605 | if Assigned(fnAccepting) then | |
606 | fnAccepting(); | |
607 | ||
608 | client := FServer.Accept; | |
609 | try | |
610 | trans := TSocketImpl.Create(client, True); | |
611 | client := nil; | |
612 | ||
613 | if FUseBufferedSocket then | |
614 | Result := TBufferedTransportImpl.Create(trans) | |
615 | else | |
616 | Result := trans; | |
617 | except | |
618 | client.Free; | |
619 | raise; | |
620 | end; | |
621 | {$ENDIF} | |
622 | end; | |
623 | ||
624 | procedure TServerSocketImpl.Listen; | |
625 | begin | |
626 | if FServer <> nil then | |
627 | begin | |
628 | {$IFDEF OLD_SOCKETS} | |
629 | try | |
630 | FServer.Active := True; | |
631 | except | |
632 | on E: Exception | |
633 | do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message); | |
634 | end; | |
635 | {$ELSE} | |
636 | FServer.Listen; | |
637 | {$ENDIF} | |
638 | end; | |
639 | end; | |
640 | ||
641 | procedure TServerSocketImpl.Close; | |
642 | begin | |
643 | if FServer <> nil then | |
644 | {$IFDEF OLD_SOCKETS} | |
645 | try | |
646 | FServer.Active := False; | |
647 | except | |
648 | on E: Exception | |
649 | do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message); | |
650 | end; | |
651 | {$ELSE} | |
652 | FServer.Close; | |
653 | {$ENDIF} | |
654 | end; | |
655 | ||
656 | { TSocket } | |
657 | ||
658 | {$IFDEF OLD_SOCKETS} | |
659 | constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); | |
660 | var stream : IThriftStream; | |
661 | begin | |
662 | FClient := AClient; | |
663 | FTimeout := ATimeout; | |
664 | FOwnsClient := aOwnsClient; | |
665 | stream := TTcpSocketStreamImpl.Create( FClient, FTimeout); | |
666 | inherited Create( stream, stream); | |
667 | end; | |
668 | {$ELSE} | |
669 | constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean); | |
670 | var stream : IThriftStream; | |
671 | begin | |
672 | FClient := AClient; | |
673 | FTimeout := AClient.RecvTimeout; | |
674 | FOwnsClient := aOwnsClient; | |
675 | stream := TTcpSocketStreamImpl.Create(FClient, FTimeout); | |
676 | inherited Create(stream, stream); | |
677 | end; | |
678 | {$ENDIF} | |
679 | ||
680 | {$IFDEF OLD_SOCKETS} | |
681 | constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer); | |
682 | {$ELSE} | |
683 | constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword); | |
684 | {$ENDIF} | |
685 | begin | |
686 | inherited Create(nil,nil); | |
687 | FHost := AHost; | |
688 | FPort := APort; | |
689 | FTimeout := ATimeout; | |
690 | InitSocket; | |
691 | end; | |
692 | ||
693 | destructor TSocketImpl.Destroy; | |
694 | begin | |
695 | if FOwnsClient | |
696 | then FreeAndNil( FClient); | |
697 | inherited; | |
698 | end; | |
699 | ||
700 | procedure TSocketImpl.Close; | |
701 | begin | |
702 | inherited Close; | |
703 | ||
704 | FInputStream := nil; | |
705 | FOutputStream := nil; | |
706 | ||
707 | if FOwnsClient | |
708 | then FreeAndNil( FClient) | |
709 | else FClient := nil; | |
710 | end; | |
711 | ||
712 | function TSocketImpl.GetIsOpen: Boolean; | |
713 | begin | |
714 | {$IFDEF OLD_SOCKETS} | |
715 | Result := (FClient <> nil) and FClient.Connected; | |
716 | {$ELSE} | |
717 | Result := (FClient <> nil) and FClient.IsOpen | |
718 | {$ENDIF} | |
719 | end; | |
720 | ||
721 | procedure TSocketImpl.InitSocket; | |
722 | var | |
723 | stream : IThriftStream; | |
724 | begin | |
725 | if FOwnsClient | |
726 | then FreeAndNil( FClient) | |
727 | else FClient := nil; | |
728 | ||
729 | {$IFDEF OLD_SOCKETS} | |
730 | FClient := TTcpClient.Create( nil); | |
731 | {$ELSE} | |
732 | FClient := TSocket.Create(FHost, FPort); | |
733 | {$ENDIF} | |
734 | FOwnsClient := True; | |
735 | ||
736 | stream := TTcpSocketStreamImpl.Create( FClient, FTimeout); | |
737 | FInputStream := stream; | |
738 | FOutputStream := stream; | |
739 | end; | |
740 | ||
741 | procedure TSocketImpl.Open; | |
742 | begin | |
743 | if IsOpen then begin | |
744 | raise TTransportExceptionAlreadyOpen.Create('Socket already connected'); | |
745 | end; | |
746 | ||
747 | if FHost = '' then begin | |
748 | raise TTransportExceptionNotOpen.Create('Cannot open null host'); | |
749 | end; | |
750 | ||
751 | if Port <= 0 then begin | |
752 | raise TTransportExceptionNotOpen.Create('Cannot open without port'); | |
753 | end; | |
754 | ||
755 | if FClient = nil | |
756 | then InitSocket; | |
757 | ||
758 | {$IFDEF OLD_SOCKETS} | |
759 | FClient.RemoteHost := TSocketHost( Host); | |
760 | FClient.RemotePort := TSocketPort( IntToStr( Port)); | |
761 | FClient.Connect; | |
762 | {$ELSE} | |
763 | FClient.Open; | |
764 | {$ENDIF} | |
765 | ||
766 | FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout); | |
767 | FOutputStream := FInputStream; | |
768 | end; | |
769 | ||
770 | { TBufferedStream } | |
771 | ||
772 | procedure TBufferedStreamImpl.Close; | |
773 | begin | |
774 | Flush; | |
775 | FStream := nil; | |
776 | ||
777 | FReadBuffer.Free; | |
778 | FReadBuffer := nil; | |
779 | ||
780 | FWriteBuffer.Free; | |
781 | FWriteBuffer := nil; | |
782 | end; | |
783 | ||
784 | constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer); | |
785 | begin | |
786 | inherited Create; | |
787 | FStream := AStream; | |
788 | FBufSize := ABufSize; | |
789 | FReadBuffer := TMemoryStream.Create; | |
790 | FWriteBuffer := TMemoryStream.Create; | |
791 | end; | |
792 | ||
793 | destructor TBufferedStreamImpl.Destroy; | |
794 | begin | |
795 | Close; | |
796 | inherited; | |
797 | end; | |
798 | ||
799 | procedure TBufferedStreamImpl.Flush; | |
800 | var | |
801 | buf : TBytes; | |
802 | len : Integer; | |
803 | begin | |
804 | if IsOpen then begin | |
805 | len := FWriteBuffer.Size; | |
806 | if len > 0 then begin | |
807 | SetLength( buf, len ); | |
808 | FWriteBuffer.Position := 0; | |
809 | FWriteBuffer.Read( Pointer(@buf[0])^, len ); | |
810 | FStream.Write( buf, 0, len ); | |
811 | end; | |
812 | FWriteBuffer.Clear; | |
813 | end; | |
814 | end; | |
815 | ||
816 | function TBufferedStreamImpl.IsOpen: Boolean; | |
817 | begin | |
818 | Result := (FWriteBuffer <> nil) | |
819 | and (FReadBuffer <> nil) | |
820 | and (FStream <> nil) | |
821 | and FStream.IsOpen; | |
822 | end; | |
823 | ||
824 | procedure TBufferedStreamImpl.Open; | |
825 | begin | |
826 | FStream.Open; | |
827 | end; | |
828 | ||
829 | function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; | |
830 | var | |
831 | nRead : Integer; | |
832 | tempbuf : TBytes; | |
833 | pTmp : PByte; | |
834 | begin | |
835 | inherited; | |
836 | Result := 0; | |
837 | ||
838 | if IsOpen then begin | |
839 | while count > 0 do begin | |
840 | ||
841 | if FReadBuffer.Position >= FReadBuffer.Size then begin | |
842 | FReadBuffer.Clear; | |
843 | SetLength( tempbuf, FBufSize); | |
844 | nRead := FStream.Read( tempbuf, 0, FBufSize ); | |
845 | if nRead = 0 then Break; // avoid infinite loop | |
846 | ||
847 | FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead ); | |
848 | FReadBuffer.Position := 0; | |
849 | end; | |
850 | ||
851 | if FReadBuffer.Position < FReadBuffer.Size then begin | |
852 | nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count); | |
853 | pTmp := pBuf; | |
854 | Inc( pTmp, offset); | |
855 | Inc( Result, FReadBuffer.Read( pTmp^, nRead)); | |
856 | Dec( count, nRead); | |
857 | Inc( offset, nRead); | |
858 | end; | |
859 | end; | |
860 | end; | |
861 | end; | |
862 | ||
863 | function TBufferedStreamImpl.ToArray: TBytes; | |
864 | var len : Integer; | |
865 | begin | |
866 | len := 0; | |
867 | ||
868 | if IsOpen then begin | |
869 | len := FReadBuffer.Size; | |
870 | end; | |
871 | ||
872 | SetLength( Result, len); | |
873 | ||
874 | if len > 0 then begin | |
875 | FReadBuffer.Position := 0; | |
876 | FReadBuffer.Read( Pointer(@Result[0])^, len ); | |
877 | end; | |
878 | end; | |
879 | ||
880 | procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer); | |
881 | var pTmp : PByte; | |
882 | begin | |
883 | inherited; | |
884 | if count > 0 then begin | |
885 | if IsOpen then begin | |
886 | pTmp := pBuf; | |
887 | Inc( pTmp, offset); | |
888 | FWriteBuffer.Write( pTmp^, count ); | |
889 | if FWriteBuffer.Size > FBufSize then begin | |
890 | Flush; | |
891 | end; | |
892 | end; | |
893 | end; | |
894 | end; | |
895 | ||
896 | { TStreamTransportImpl } | |
897 | ||
898 | constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream); | |
899 | begin | |
900 | inherited Create; | |
901 | FInputStream := AInputStream; | |
902 | FOutputStream := AOutputStream; | |
903 | end; | |
904 | ||
905 | destructor TStreamTransportImpl.Destroy; | |
906 | begin | |
907 | FInputStream := nil; | |
908 | FOutputStream := nil; | |
909 | inherited; | |
910 | end; | |
911 | ||
912 | procedure TStreamTransportImpl.Close; | |
913 | begin | |
914 | FInputStream := nil; | |
915 | FOutputStream := nil; | |
916 | end; | |
917 | ||
918 | procedure TStreamTransportImpl.Flush; | |
919 | begin | |
920 | if FOutputStream = nil then begin | |
921 | raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' ); | |
922 | end; | |
923 | ||
924 | FOutputStream.Flush; | |
925 | end; | |
926 | ||
927 | function TStreamTransportImpl.GetInputStream: IThriftStream; | |
928 | begin | |
929 | Result := FInputStream; | |
930 | end; | |
931 | ||
932 | function TStreamTransportImpl.GetIsOpen: Boolean; | |
933 | begin | |
934 | Result := True; | |
935 | end; | |
936 | ||
937 | function TStreamTransportImpl.GetOutputStream: IThriftStream; | |
938 | begin | |
939 | Result := FOutputStream; | |
940 | end; | |
941 | ||
942 | procedure TStreamTransportImpl.Open; | |
943 | begin | |
944 | ||
945 | end; | |
946 | ||
947 | function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; | |
948 | begin | |
949 | if FInputStream = nil then begin | |
950 | raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' ); | |
951 | end; | |
952 | ||
953 | Result := FInputStream.Read( pBuf,buflen, off, len ); | |
954 | end; | |
955 | ||
956 | procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer); | |
957 | begin | |
958 | if FOutputStream = nil then begin | |
959 | raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' ); | |
960 | end; | |
961 | ||
962 | FOutputStream.Write( pBuf, off, len ); | |
963 | end; | |
964 | ||
965 | { TBufferedTransportImpl } | |
966 | ||
967 | constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport); | |
968 | begin | |
969 | //no inherited; | |
970 | Create( ATransport, 1024 ); | |
971 | end; | |
972 | ||
973 | constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer); | |
974 | begin | |
975 | inherited Create; | |
976 | FTransport := ATransport; | |
977 | FBufSize := ABufSize; | |
978 | InitBuffers; | |
979 | end; | |
980 | ||
981 | procedure TBufferedTransportImpl.Close; | |
982 | begin | |
983 | FTransport.Close; | |
984 | FInputBuffer := nil; | |
985 | FOutputBuffer := nil; | |
986 | end; | |
987 | ||
988 | procedure TBufferedTransportImpl.Flush; | |
989 | begin | |
990 | if FOutputBuffer <> nil then begin | |
991 | FOutputBuffer.Flush; | |
992 | end; | |
993 | end; | |
994 | ||
995 | function TBufferedTransportImpl.GetIsOpen: Boolean; | |
996 | begin | |
997 | Result := FTransport.IsOpen; | |
998 | end; | |
999 | ||
1000 | function TBufferedTransportImpl.GetUnderlyingTransport: ITransport; | |
1001 | begin | |
1002 | Result := FTransport; | |
1003 | end; | |
1004 | ||
1005 | procedure TBufferedTransportImpl.InitBuffers; | |
1006 | begin | |
1007 | if FTransport.InputStream <> nil then begin | |
1008 | FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize ); | |
1009 | end; | |
1010 | if FTransport.OutputStream <> nil then begin | |
1011 | FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize ); | |
1012 | end; | |
1013 | end; | |
1014 | ||
1015 | procedure TBufferedTransportImpl.Open; | |
1016 | begin | |
1017 | FTransport.Open; | |
1018 | InitBuffers; // we need to get the buffers to match FTransport substreams again | |
1019 | end; | |
1020 | ||
1021 | function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; | |
1022 | begin | |
1023 | Result := 0; | |
1024 | if FInputBuffer <> nil then begin | |
1025 | Result := FInputBuffer.Read( pBuf,buflen, off, len ); | |
1026 | end; | |
1027 | end; | |
1028 | ||
1029 | procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer); | |
1030 | begin | |
1031 | if FOutputBuffer <> nil then begin | |
1032 | FOutputBuffer.Write( pBuf, off, len ); | |
1033 | end; | |
1034 | end; | |
1035 | ||
1036 | { TFramedTransportImpl } | |
1037 | ||
1038 | {$IFDEF HAVE_CLASS_CTOR} | |
1039 | class constructor TFramedTransportImpl.Create; | |
1040 | begin | |
1041 | SetLength( FHeader_Dummy, FHeaderSize); | |
1042 | FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0); | |
1043 | end; | |
1044 | {$ELSE} | |
1045 | procedure TFramedTransportImpl_Initialize; | |
1046 | begin | |
1047 | SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize); | |
1048 | FillChar( TFramedTransportImpl.FHeader_Dummy[0], | |
1049 | Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0); | |
1050 | end; | |
1051 | {$ENDIF} | |
1052 | ||
1053 | constructor TFramedTransportImpl.Create; | |
1054 | begin | |
1055 | inherited Create; | |
1056 | InitWriteBuffer; | |
1057 | end; | |
1058 | ||
1059 | procedure TFramedTransportImpl.Close; | |
1060 | begin | |
1061 | FTransport.Close; | |
1062 | end; | |
1063 | ||
1064 | constructor TFramedTransportImpl.Create( const ATrans: ITransport); | |
1065 | begin | |
1066 | inherited Create; | |
1067 | InitWriteBuffer; | |
1068 | FTransport := ATrans; | |
1069 | end; | |
1070 | ||
1071 | destructor TFramedTransportImpl.Destroy; | |
1072 | begin | |
1073 | FWriteBuffer.Free; | |
1074 | FReadBuffer.Free; | |
1075 | inherited; | |
1076 | end; | |
1077 | ||
1078 | procedure TFramedTransportImpl.Flush; | |
1079 | var | |
1080 | buf : TBytes; | |
1081 | len : Integer; | |
1082 | data_len : Integer; | |
1083 | ||
1084 | begin | |
1085 | len := FWriteBuffer.Size; | |
1086 | SetLength( buf, len); | |
1087 | if len > 0 then begin | |
1088 | System.Move( FWriteBuffer.Memory^, buf[0], len ); | |
1089 | end; | |
1090 | ||
1091 | data_len := len - FHeaderSize; | |
1092 | if (data_len < 0) then begin | |
1093 | raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' ); | |
1094 | end; | |
1095 | ||
1096 | InitWriteBuffer; | |
1097 | ||
1098 | buf[0] := Byte($FF and (data_len shr 24)); | |
1099 | buf[1] := Byte($FF and (data_len shr 16)); | |
1100 | buf[2] := Byte($FF and (data_len shr 8)); | |
1101 | buf[3] := Byte($FF and data_len); | |
1102 | ||
1103 | FTransport.Write( buf, 0, len ); | |
1104 | FTransport.Flush; | |
1105 | end; | |
1106 | ||
1107 | function TFramedTransportImpl.GetIsOpen: Boolean; | |
1108 | begin | |
1109 | Result := FTransport.IsOpen; | |
1110 | end; | |
1111 | ||
1112 | type | |
1113 | TAccessMemoryStream = class(TMemoryStream) | |
1114 | end; | |
1115 | ||
1116 | procedure TFramedTransportImpl.InitWriteBuffer; | |
1117 | begin | |
1118 | FWriteBuffer.Free; | |
1119 | FWriteBuffer := TMemoryStream.Create; | |
1120 | TAccessMemoryStream(FWriteBuffer).Capacity := 1024; | |
1121 | FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize); | |
1122 | end; | |
1123 | ||
1124 | procedure TFramedTransportImpl.Open; | |
1125 | begin | |
1126 | FTransport.Open; | |
1127 | end; | |
1128 | ||
1129 | function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; | |
1130 | var pTmp : PByte; | |
1131 | begin | |
1132 | if len > (buflen-off) | |
1133 | then len := buflen-off; | |
1134 | ||
1135 | pTmp := pBuf; | |
1136 | Inc( pTmp, off); | |
1137 | ||
1138 | if (FReadBuffer <> nil) and (len > 0) then begin | |
1139 | result := FReadBuffer.Read( pTmp^, len); | |
1140 | if result > 0 then begin | |
1141 | Exit; | |
1142 | end; | |
1143 | end; | |
1144 | ||
1145 | ReadFrame; | |
1146 | if len > 0 | |
1147 | then Result := FReadBuffer.Read( pTmp^, len) | |
1148 | else Result := 0; | |
1149 | end; | |
1150 | ||
1151 | procedure TFramedTransportImpl.ReadFrame; | |
1152 | var | |
1153 | i32rd : TBytes; | |
1154 | size : Integer; | |
1155 | buff : TBytes; | |
1156 | begin | |
1157 | SetLength( i32rd, FHeaderSize ); | |
1158 | FTransport.ReadAll( i32rd, 0, FHeaderSize); | |
1159 | size := | |
1160 | ((i32rd[0] and $FF) shl 24) or | |
1161 | ((i32rd[1] and $FF) shl 16) or | |
1162 | ((i32rd[2] and $FF) shl 8) or | |
1163 | (i32rd[3] and $FF); | |
1164 | SetLength( buff, size ); | |
1165 | FTransport.ReadAll( buff, 0, size ); | |
1166 | FReadBuffer.Free; | |
1167 | FReadBuffer := TMemoryStream.Create; | |
1168 | if Length(buff) > 0 | |
1169 | then FReadBuffer.Write( Pointer(@buff[0])^, size ); | |
1170 | FReadBuffer.Position := 0; | |
1171 | end; | |
1172 | ||
1173 | procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer); | |
1174 | var pTmp : PByte; | |
1175 | begin | |
1176 | if len > 0 then begin | |
1177 | pTmp := pBuf; | |
1178 | Inc( pTmp, off); | |
1179 | ||
1180 | FWriteBuffer.Write( pTmp^, len ); | |
1181 | end; | |
1182 | end; | |
1183 | ||
1184 | { TFramedTransport.TFactory } | |
1185 | ||
1186 | function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport; | |
1187 | begin | |
1188 | Result := TFramedTransportImpl.Create( ATrans ); | |
1189 | end; | |
1190 | ||
1191 | { TTcpSocketStreamImpl } | |
1192 | ||
1193 | procedure TTcpSocketStreamImpl.Close; | |
1194 | begin | |
1195 | FTcpClient.Close; | |
1196 | end; | |
1197 | ||
1198 | {$IFDEF OLD_SOCKETS} | |
1199 | constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer); | |
1200 | begin | |
1201 | inherited Create; | |
1202 | FTcpClient := ATcpClient; | |
1203 | FTimeout := aTimeout; | |
1204 | end; | |
1205 | {$ELSE} | |
1206 | constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword); | |
1207 | begin | |
1208 | inherited Create; | |
1209 | FTcpClient := ATcpClient; | |
1210 | if aTimeout = 0 then | |
1211 | FTcpClient.RecvTimeout := SLEEP_TIME | |
1212 | else | |
1213 | FTcpClient.RecvTimeout := aTimeout; | |
1214 | FTcpClient.SendTimeout := aTimeout; | |
1215 | end; | |
1216 | {$ENDIF} | |
1217 | ||
1218 | procedure TTcpSocketStreamImpl.Flush; | |
1219 | begin | |
1220 | ||
1221 | end; | |
1222 | ||
1223 | function TTcpSocketStreamImpl.IsOpen: Boolean; | |
1224 | begin | |
1225 | {$IFDEF OLD_SOCKETS} | |
1226 | Result := FTcpClient.Active; | |
1227 | {$ELSE} | |
1228 | Result := FTcpClient.IsOpen; | |
1229 | {$ENDIF} | |
1230 | end; | |
1231 | ||
1232 | procedure TTcpSocketStreamImpl.Open; | |
1233 | begin | |
1234 | FTcpClient.Open; | |
1235 | end; | |
1236 | ||
1237 | ||
1238 | {$IFDEF OLD_SOCKETS} | |
1239 | function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean; | |
1240 | TimeOut: Integer; var wsaError : Integer): Integer; | |
1241 | var | |
1242 | ReadFds: TFDset; | |
1243 | ReadFdsptr: PFDset; | |
1244 | WriteFds: TFDset; | |
1245 | WriteFdsptr: PFDset; | |
1246 | ExceptFds: TFDset; | |
1247 | ExceptFdsptr: PFDset; | |
1248 | tv: timeval; | |
1249 | Timeptr: PTimeval; | |
1250 | socket : TSocket; | |
1251 | begin | |
1252 | if not FTcpClient.Active then begin | |
1253 | wsaError := WSAEINVAL; | |
1254 | Exit( SOCKET_ERROR); | |
1255 | end; | |
1256 | ||
1257 | socket := FTcpClient.Handle; | |
1258 | ||
1259 | if Assigned(ReadReady) then begin | |
1260 | ReadFdsptr := @ReadFds; | |
1261 | FD_ZERO(ReadFds); | |
1262 | FD_SET(socket, ReadFds); | |
1263 | end | |
1264 | else begin | |
1265 | ReadFdsptr := nil; | |
1266 | end; | |
1267 | ||
1268 | if Assigned(WriteReady) then begin | |
1269 | WriteFdsptr := @WriteFds; | |
1270 | FD_ZERO(WriteFds); | |
1271 | FD_SET(socket, WriteFds); | |
1272 | end | |
1273 | else begin | |
1274 | WriteFdsptr := nil; | |
1275 | end; | |
1276 | ||
1277 | if Assigned(ExceptFlag) then begin | |
1278 | ExceptFdsptr := @ExceptFds; | |
1279 | FD_ZERO(ExceptFds); | |
1280 | FD_SET(socket, ExceptFds); | |
1281 | end | |
1282 | else begin | |
1283 | ExceptFdsptr := nil; | |
1284 | end; | |
1285 | ||
1286 | if TimeOut >= 0 then begin | |
1287 | tv.tv_sec := TimeOut div 1000; | |
1288 | tv.tv_usec := 1000 * (TimeOut mod 1000); | |
1289 | Timeptr := @tv; | |
1290 | end | |
1291 | else begin | |
1292 | Timeptr := nil; // wait forever | |
1293 | end; | |
1294 | ||
1295 | wsaError := 0; | |
1296 | try | |
1297 | {$IFDEF MSWINDOWS} | |
1298 | {$IFDEF OLD_UNIT_NAMES} | |
1299 | result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr); | |
1300 | {$ELSE} | |
1301 | result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr); | |
1302 | {$ENDIF} | |
1303 | {$ENDIF} | |
1304 | {$IFDEF LINUX} | |
1305 | result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr); | |
1306 | {$ENDIF} | |
1307 | ||
1308 | if result = SOCKET_ERROR | |
1309 | then wsaError := WSAGetLastError; | |
1310 | ||
1311 | except | |
1312 | result := SOCKET_ERROR; | |
1313 | end; | |
1314 | ||
1315 | if Assigned(ReadReady) then | |
1316 | ReadReady^ := FD_ISSET(socket, ReadFds); | |
1317 | ||
1318 | if Assigned(WriteReady) then | |
1319 | WriteReady^ := FD_ISSET(socket, WriteFds); | |
1320 | ||
1321 | if Assigned(ExceptFlag) then | |
1322 | ExceptFlag^ := FD_ISSET(socket, ExceptFds); | |
1323 | end; | |
1324 | {$ENDIF} | |
1325 | ||
1326 | {$IFDEF OLD_SOCKETS} | |
1327 | function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer; | |
1328 | DesiredBytes : Integer; | |
1329 | var wsaError, bytesReady : Integer): TWaitForData; | |
1330 | var bCanRead, bError : Boolean; | |
1331 | retval : Integer; | |
1332 | const | |
1333 | MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF}; | |
1334 | begin | |
1335 | bytesReady := 0; | |
1336 | ||
1337 | // The select function returns the total number of socket handles that are ready | |
1338 | // and contained in the fd_set structures, zero if the time limit expired, | |
1339 | // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR, | |
1340 | // WSAGetLastError can be used to retrieve a specific error code. | |
1341 | retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError); | |
1342 | if retval = SOCKET_ERROR | |
1343 | then Exit( TWaitForData.wfd_Error); | |
1344 | if (retval = 0) or not bCanRead | |
1345 | then Exit( TWaitForData.wfd_Timeout); | |
1346 | ||
1347 | // recv() returns the number of bytes received, or -1 if an error occurred. | |
1348 | // The return value will be 0 when the peer has performed an orderly shutdown. | |
1349 | ||
1350 | retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK); | |
1351 | if retval <= 0 | |
1352 | then Exit( TWaitForData.wfd_Error); | |
1353 | ||
1354 | // at least we have some data | |
1355 | bytesReady := Min( retval, DesiredBytes); | |
1356 | result := TWaitForData.wfd_HaveData; | |
1357 | end; | |
1358 | {$ENDIF} | |
1359 | ||
1360 | {$IFDEF OLD_SOCKETS} | |
1361 | function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; | |
1362 | // old sockets version | |
1363 | var wfd : TWaitForData; | |
1364 | wsaError, | |
1365 | msecs : Integer; | |
1366 | nBytes : Integer; | |
1367 | pTmp : PByte; | |
1368 | begin | |
1369 | inherited; | |
1370 | ||
1371 | if FTimeout > 0 | |
1372 | then msecs := FTimeout | |
1373 | else msecs := DEFAULT_THRIFT_TIMEOUT; | |
1374 | ||
1375 | result := 0; | |
1376 | pTmp := pBuf; | |
1377 | Inc( pTmp, offset); | |
1378 | while count > 0 do begin | |
1379 | ||
1380 | while TRUE do begin | |
1381 | wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes); | |
1382 | case wfd of | |
1383 | TWaitForData.wfd_Error : Exit; | |
1384 | TWaitForData.wfd_HaveData : Break; | |
1385 | TWaitForData.wfd_Timeout : begin | |
1386 | if (FTimeout = 0) | |
1387 | then Exit | |
1388 | else begin | |
1389 | raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError))); | |
1390 | ||
1391 | end; | |
1392 | end; | |
1393 | else | |
1394 | ASSERT( FALSE); | |
1395 | end; | |
1396 | end; | |
1397 | ||
1398 | // reduce the timeout once we got data | |
1399 | if FTimeout > 0 | |
1400 | then msecs := FTimeout div 10 | |
1401 | else msecs := DEFAULT_THRIFT_TIMEOUT div 10; | |
1402 | msecs := Max( msecs, 200); | |
1403 | ||
1404 | ASSERT( nBytes <= count); | |
1405 | nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes); | |
1406 | Inc( pTmp, nBytes); | |
1407 | Dec( count, nBytes); | |
1408 | Inc( result, nBytes); | |
1409 | end; | |
1410 | end; | |
1411 | ||
1412 | function TTcpSocketStreamImpl.ToArray: TBytes; | |
1413 | // old sockets version | |
1414 | var len : Integer; | |
1415 | begin | |
1416 | len := 0; | |
1417 | if IsOpen then begin | |
1418 | len := FTcpClient.BytesReceived; | |
1419 | end; | |
1420 | ||
1421 | SetLength( Result, len ); | |
1422 | ||
1423 | if len > 0 then begin | |
1424 | FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len); | |
1425 | end; | |
1426 | end; | |
1427 | ||
1428 | procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer); | |
1429 | // old sockets version | |
1430 | var bCanWrite, bError : Boolean; | |
1431 | retval, wsaError : Integer; | |
1432 | pTmp : PByte; | |
1433 | begin | |
1434 | inherited; | |
1435 | ||
1436 | if not FTcpClient.Active | |
1437 | then raise TTransportExceptionNotOpen.Create('not open'); | |
1438 | ||
1439 | // The select function returns the total number of socket handles that are ready | |
1440 | // and contained in the fd_set structures, zero if the time limit expired, | |
1441 | // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR, | |
1442 | // WSAGetLastError can be used to retrieve a specific error code. | |
1443 | retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError); | |
1444 | if retval = SOCKET_ERROR | |
1445 | then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError))); | |
1446 | ||
1447 | if (retval = 0) | |
1448 | then raise TTransportExceptionTimedOut.Create('timed out'); | |
1449 | ||
1450 | if bError or not bCanWrite | |
1451 | then raise TTransportExceptionUnknown.Create('unknown error'); | |
1452 | ||
1453 | pTmp := pBuf; | |
1454 | Inc( pTmp, offset); | |
1455 | FTcpClient.SendBuf( pTmp^, count); | |
1456 | end; | |
1457 | ||
1458 | {$ELSE} | |
1459 | ||
1460 | function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; | |
1461 | // new sockets version | |
1462 | var nBytes : Integer; | |
1463 | pTmp : PByte; | |
1464 | begin | |
1465 | inherited; | |
1466 | ||
1467 | result := 0; | |
1468 | pTmp := pBuf; | |
1469 | Inc( pTmp, offset); | |
1470 | while count > 0 do begin | |
1471 | nBytes := FTcpClient.Read( pTmp^, count); | |
1472 | if nBytes = 0 then Exit; | |
1473 | Inc( pTmp, nBytes); | |
1474 | Dec( count, nBytes); | |
1475 | Inc( result, nBytes); | |
1476 | end; | |
1477 | end; | |
1478 | ||
1479 | function TTcpSocketStreamImpl.ToArray: TBytes; | |
1480 | // new sockets version | |
1481 | var len : Integer; | |
1482 | begin | |
1483 | len := 0; | |
1484 | try | |
1485 | if FTcpClient.Peek then | |
1486 | repeat | |
1487 | SetLength(Result, Length(Result) + 1024); | |
1488 | len := FTcpClient.Read(Result[Length(Result) - 1024], 1024); | |
1489 | until len < 1024; | |
1490 | except | |
1491 | on TTransportException do begin { don't allow default exceptions } end; | |
1492 | else raise; | |
1493 | end; | |
1494 | if len > 0 then | |
1495 | SetLength(Result, Length(Result) - 1024 + len); | |
1496 | end; | |
1497 | ||
1498 | procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer); | |
1499 | // new sockets version | |
1500 | var pTmp : PByte; | |
1501 | begin | |
1502 | inherited; | |
1503 | ||
1504 | if not FTcpClient.IsOpen | |
1505 | then raise TTransportExceptionNotOpen.Create('not open'); | |
1506 | ||
1507 | pTmp := pBuf; | |
1508 | Inc( pTmp, offset); | |
1509 | FTcpClient.Write( pTmp^, count); | |
1510 | end; | |
1511 | ||
1512 | {$ENDIF} | |
1513 | ||
1514 | ||
1515 | {$IF CompilerVersion < 21.0} | |
1516 | initialization | |
1517 | begin | |
1518 | TFramedTransportImpl_Initialize; | |
1519 | end; | |
1520 | {$IFEND} | |
1521 | ||
1522 | ||
1523 | end. |