]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/perl/lib/Thrift/Socket.pm
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / perl / lib / Thrift / Socket.pm
1 #
2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
17 # under the License.
18 #
19
20 use 5.10.0;
21 use strict;
22 use warnings;
23
24 use Thrift;
25 use Thrift::Exception;
26 use Thrift::Transport;
27
28 use IO::Socket::INET;
29 use IO::Select;
30
31 package Thrift::Socket;
32 use base qw( Thrift::Transport );
33 use version 0.77; our $VERSION = version->declare("$Thrift::VERSION");
34
35 #
36 # Construction and usage
37 #
38 # my $opts = {}
39 # my $socket = Thrift::Socket->new(\%opts);
40 #
41 # options:
42 #
43 # host => host to connect to
44 # port => port to connect to
45 # sendTimeout => timeout used for send and for connect
46 # recvTimeout => timeout used for recv
47 #
48
49 sub new
50 {
51 my $classname = shift;
52 my $opts = shift;
53
54 # default settings:
55 my $self = {
56 host => 'localhost',
57 port => 9090,
58 recvTimeout => 10000,
59 sendTimeout => 10000,
60
61 handle => undef
62 };
63
64 if (defined $opts and ref $opts eq ref {}) {
65
66 # argument is a hash of options so override the defaults
67 $self->{$_} = $opts->{$_} for keys %$opts;
68
69 } else {
70
71 # older style constructor takes 3 arguments, none of which are required
72 $self->{host} = $opts || 'localhost';
73 $self->{port} = shift || 9090;
74
75 }
76
77 return bless($self,$classname);
78 }
79
80
81 sub setSendTimeout
82 {
83 my $self = shift;
84 my $timeout = shift;
85
86 $self->{sendTimeout} = $timeout;
87 }
88
89 sub setRecvTimeout
90 {
91 my $self = shift;
92 my $timeout = shift;
93
94 $self->{recvTimeout} = $timeout;
95 }
96
97
98 #
99 # Tests whether this is open
100 #
101 # @return bool true if the socket is open
102 #
103 sub isOpen
104 {
105 my $self = shift;
106
107 if( defined $self->{handle} ){
108 return ($self->{handle}->handles())[0]->connected;
109 }
110
111 return 0;
112 }
113
114 #
115 # Connects the socket.
116 #
117 sub open
118 {
119 my $self = shift;
120
121 my $sock = $self->__open() || do {
122 my $error = ref($self).': Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')';
123 die Thrift::TTransportException->new($error, Thrift::TTransportException::NOT_OPEN);
124 };
125
126 $self->{handle} = IO::Select->new( $sock );
127 }
128
129 #
130 # Closes the socket.
131 #
132 sub close
133 {
134 my $self = shift;
135 if( defined $self->{handle} ) {
136 $self->__close();
137 }
138 }
139
140 #
141 # Uses stream get contents to do the reading
142 #
143 # @param int $len How many bytes
144 # @return string Binary data
145 #
146 sub readAll
147 {
148 my $self = shift;
149 my $len = shift;
150
151
152 return unless defined $self->{handle};
153
154 my $pre = "";
155 while (1) {
156
157 my $sock = $self->__wait();
158 my $buf = $self->__recv($sock, $len);
159
160 if (!defined $buf || $buf eq '') {
161
162 die Thrift::TTransportException->new(ref($self).': Could not read '.$len.' bytes from '.
163 $self->{host}.':'.$self->{port}, Thrift::TTransportException::END_OF_FILE);
164
165 }
166 elsif ((my $sz = length($buf)) < $len) {
167
168 $pre .= $buf;
169 $len -= $sz;
170
171 }
172 else {
173 return $pre.$buf;
174 }
175 }
176 }
177
178 #
179 # Read from the socket
180 #
181 # @param int $len How many bytes
182 # @return string Binary data
183 #
184 sub read
185 {
186 my $self = shift;
187 my $len = shift;
188
189 return unless defined $self->{handle};
190
191 my $sock = $self->__wait();
192 my $buf = $self->__recv($sock, $len);
193
194 if (!defined $buf || $buf eq '') {
195
196 die Thrift::TTransportException->new(ref($self).': Could not read '.$len.' bytes from '.
197 $self->{host}.':'.$self->{port}, Thrift::TTransportException::END_OF_FILE);
198
199 }
200
201 return $buf;
202 }
203
204
205 #
206 # Write to the socket.
207 #
208 # @param string $buf The data to write
209 #
210 sub write
211 {
212 my $self = shift;
213 my $buf = shift;
214
215 return unless defined $self->{handle};
216
217 while (length($buf) > 0) {
218 #check for timeout
219 my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 );
220
221 if(@sockets == 0){
222 die Thrift::TTransportException->new(ref($self).': timed out writing to bytes from '.
223 $self->{host}.':'.$self->{port}, Thrift::TTransportException::TIMED_OUT);
224 }
225
226 my $sent = $self->__send($sockets[0], $buf);
227
228 if (!defined $sent || $sent == 0 ) {
229
230 die Thrift::TTransportException->new(ref($self).': Could not write '.length($buf).' bytes '.
231 $self->{host}.':'.$self->{host}, Thrift::TTransportException::END_OF_FILE);
232
233 }
234
235 $buf = substr($buf, $sent);
236 }
237 }
238
239 #
240 # Flush output to the socket.
241 #
242 sub flush
243 {
244 my $self = shift;
245
246 return unless defined $self->{handle};
247
248 my $ret = ($self->{handle}->handles())[0]->flush;
249 }
250
251 ###
252 ### Overridable methods
253 ###
254
255 #
256 # Open a connection to a server.
257 #
258 sub __open
259 {
260 my $self = shift;
261 return IO::Socket::INET->new(PeerAddr => $self->{host},
262 PeerPort => $self->{port},
263 Proto => 'tcp',
264 Timeout => $self->{sendTimeout} / 1000);
265 }
266
267 #
268 # Close the connection
269 #
270 sub __close
271 {
272 my $self = shift;
273 CORE::close(($self->{handle}->handles())[0]);
274 }
275
276 #
277 # Read data
278 #
279 # @param[in] $sock the socket
280 # @param[in] $len the length to read
281 # @returns the data buffer that was read
282 #
283 sub __recv
284 {
285 my $self = shift;
286 my $sock = shift;
287 my $len = shift;
288 my $buf = undef;
289 $sock->recv($buf, $len);
290 return $buf;
291 }
292
293 #
294 # Send data
295 #
296 # @param[in] $sock the socket
297 # @param[in] $buf the data buffer
298 # @returns the number of bytes written
299 #
300 sub __send
301 {
302 my $self = shift;
303 my $sock = shift;
304 my $buf = shift;
305 return $sock->send($buf);
306 }
307
308 #
309 # Wait for data to be readable
310 #
311 # @returns a socket that can be read
312 #
313 sub __wait
314 {
315 my $self = shift;
316 my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );
317
318 if (@sockets == 0) {
319 die Thrift::TTransportException->new(ref($self).': timed out reading from '.
320 $self->{host}.':'.$self->{port}, Thrift::TTransportException::TIMED_OUT);
321 }
322
323 return $sockets[0];
324 }
325
326
327 1;