]>
git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/rb/lib/thrift/transport/socket.rb
3 # Licensed to the Apache Software Foundation (ASF) under one
4 # or more contributor license agreements. See the NOTICE file
5 # distributed with this work for additional information
6 # regarding copyright ownership. The ASF licenses this file
7 # to you under the Apache License, Version 2.0 (the
8 # "License"); you may not use this file except in compliance
9 # with the License. You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing,
14 # software distributed under the License is distributed on an
15 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 # KIND, either express or implied. See the License for the
17 # specific language governing permissions and limitations
24 class Socket
< BaseTransport
25 def initialize(host
='localhost', port
=9090, timeout
=nil)
29 @desc = "#{host}:#{port}"
33 attr_accessor
:handle, :timeout
36 for addrinfo
in ::Socket::getaddrinfo(@host, @port, nil, ::Socket::SOCK_STREAM) do
38 socket
= ::Socket.new(addrinfo
[4], ::Socket::SOCK_STREAM, 0)
39 socket
.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
40 sockaddr
= ::Socket.sockaddr_in(addrinfo
[1], addrinfo
[3])
42 socket
.connect_nonblock(sockaddr
)
43 rescue Errno
::EINPROGRESS
44 unless IO
.select(nil, [ socket
], nil, @timeout)
48 socket
.connect_nonblock(sockaddr
)
52 return @handle = socket
53 rescue StandardError
=> e
57 raise TransportException
.new(TransportException
::NOT_OPEN, "Could not connect to #{@desc}: #{e}")
61 !@handle.nil? and !@handle.closed
?
65 raise IOError
, "closed stream" unless open
?
66 str
= Bytes
.force_binary_encoding(str
)
68 if @timeout.nil? or @timeout == 0
73 while Time
.now
- start
< @timeout
74 rd
, wr
, = IO
.select(nil, [@handle], nil, @timeout)
75 if wr
and not wr
.empty
?
76 len
+= @handle.write_nonblock(str
[len
..-1])
77 break if len
>= str
.length
81 raise TransportException
.new(TransportException
::TIMED_OUT, "Socket: Timed out writing #{str.length} bytes to #{@desc}")
86 rescue TransportException
=> e
89 rescue StandardError
=> e
92 raise TransportException
.new(TransportException
::NOT_OPEN, e
.message
)
97 raise IOError
, "closed stream" unless open
?
100 if @timeout.nil? or @timeout == 0
101 data = @handle.readpartial(sz
)
103 # it's possible to interrupt select for something other than the timeout
104 # so we need to ensure we've waited long enough, but not too long
108 rd
, = IO
.select([@handle], nil, nil, @timeout - timespent
)
109 timespent
= Time
.now
- start
110 break rd
if (rd
and not rd
.empty
?) or timespent
>= @timeout
112 if rd
.nil? or rd
.empty
?
113 raise TransportException
.new(TransportException
::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}")
115 data = @handle.readpartial(sz
)
118 rescue TransportException
=> e
119 # don't let this get caught by the StandardError handler
121 rescue StandardError
=> e
122 @handle.close
unless @handle.closed
?
124 raise TransportException
.new(TransportException
::NOT_OPEN, e
.message
)
126 if (data.nil? or data.length
== 0)
127 raise TransportException
.new(TransportException
::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@desc}")
133 @handle.close
unless @handle.nil? or @handle.closed
?
140 "socket(#{@host}:#{@port})"