]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/erl/src/thrift_socket_transport.erl
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / jaegertracing / thrift / lib / erl / src / thrift_socket_transport.erl
1 %%
2 %% Licensed to the Apache Software Foundation (ASF) under one
3 %% or more contributor license agreements. See the NOTICE file
4 %% distributed with this work for additional information
5 %% regarding copyright ownership. The ASF licenses this file
6 %% to you under the Apache License, Version 2.0 (the
7 %% "License"); you may not use this file except in compliance
8 %% with the License. You may obtain a copy of the License at
9 %%
10 %% http://www.apache.org/licenses/LICENSE-2.0
11 %%
12 %% Unless required by applicable law or agreed to in writing,
13 %% software distributed under the License is distributed on an
14 %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 %% KIND, either express or implied. See the License for the
16 %% specific language governing permissions and limitations
17 %% under the License.
18 %%
19
20 -module(thrift_socket_transport).
21
22 -behaviour(thrift_transport).
23
24 %% constructors
25 -export([new/1, new/2]).
26 %% transport callbacks
27 -export([read/2, read_exact/2, write/2, flush/1, close/1]).
28 %% legacy api
29 -export([new_transport_factory/3]).
30
31
32 -record(t_socket, {
33 socket,
34 recv_timeout=60000,
35 buffer = []
36 }).
37
38 -type state() :: #t_socket{}.
39
40
41 -spec new(Socket::any()) ->
42 thrift_transport:t_transport().
43
44 new(Socket) -> new(Socket, []).
45
46 -spec new(Socket::any(), Opts::list()) ->
47 thrift_transport:t_transport().
48
49 new(Socket, Opts) when is_list(Opts) ->
50 State = parse_opts(Opts, #t_socket{socket = Socket}),
51 thrift_transport:new(?MODULE, State).
52
53
54 parse_opts([{recv_timeout, Timeout}|Rest], State)
55 when is_integer(Timeout), Timeout > 0 ->
56 parse_opts(Rest, State#t_socket{recv_timeout = Timeout});
57 parse_opts([{recv_timeout, infinity}|Rest], State) ->
58 parse_opts(Rest, State#t_socket{recv_timeout = infinity});
59 parse_opts([], State) ->
60 State.
61
62
63 -include("thrift_transport_behaviour.hrl").
64
65
66 read(State = #t_socket{buffer = Buf}, Len)
67 when is_integer(Len), Len >= 0 ->
68 Binary = iolist_to_binary(Buf),
69 case iolist_size(Binary) of
70 X when X >= Len ->
71 {Result, Remaining} = split_binary(Binary, Len),
72 {State#t_socket{buffer = Remaining}, {ok, Result}};
73 _ -> recv(State, Len)
74 end.
75
76 recv(State = #t_socket{socket = Socket, buffer = Buf}, Len) ->
77 case gen_tcp:recv(Socket, 0, State#t_socket.recv_timeout) of
78 {error, Error} ->
79 gen_tcp:close(Socket),
80 {State, {error, Error}};
81 {ok, Data} ->
82 Binary = iolist_to_binary([Buf, Data]),
83 Give = min(iolist_size(Binary), Len),
84 {Result, Remaining} = split_binary(Binary, Give),
85 {State#t_socket{buffer = Remaining}, {ok, Result}}
86 end.
87
88
89 read_exact(State = #t_socket{buffer = Buf}, Len)
90 when is_integer(Len), Len >= 0 ->
91 Binary = iolist_to_binary(Buf),
92 case iolist_size(Binary) of
93 X when X >= Len -> read(State, Len);
94 X ->
95 case gen_tcp:recv(State#t_socket.socket, Len - X, State#t_socket.recv_timeout) of
96 {error, Error} ->
97 gen_tcp:close(State#t_socket.socket),
98 {State, {error, Error}};
99 {ok, Data} ->
100 {State#t_socket{buffer = []}, {ok, <<Binary/binary, Data/binary>>}}
101 end
102 end.
103
104
105 write(State = #t_socket{socket = Socket}, Data) ->
106 case gen_tcp:send(Socket, Data) of
107 {error, Error} ->
108 gen_tcp:close(Socket),
109 {State, {error, Error}};
110 ok -> {State, ok}
111 end.
112
113
114 flush(State) ->
115 {State#t_socket{buffer = []}, ok}.
116
117
118 close(State = #t_socket{socket = Socket}) ->
119 {State, gen_tcp:close(Socket)}.
120
121
122 %% legacy api. left for compatibility
123
124 %% The following "local" record is filled in by parse_factory_options/2
125 %% below. These options can be passed to new_protocol_factory/3 in a
126 %% proplists-style option list. They're parsed like this so it is an O(n)
127 %% operation instead of O(n^2)
128 -record(factory_opts, {
129 connect_timeout = infinity,
130 sockopts = [],
131 framed = false
132 }).
133
134 parse_factory_options([], FactoryOpts, TransOpts) -> {FactoryOpts, TransOpts};
135 parse_factory_options([{framed, Bool}|Rest], FactoryOpts, TransOpts)
136 when is_boolean(Bool) ->
137 parse_factory_options(Rest, FactoryOpts#factory_opts{framed = Bool}, TransOpts);
138 parse_factory_options([{sockopts, OptList}|Rest], FactoryOpts, TransOpts)
139 when is_list(OptList) ->
140 parse_factory_options(Rest, FactoryOpts#factory_opts{sockopts = OptList}, TransOpts);
141 parse_factory_options([{connect_timeout, TO}|Rest], FactoryOpts, TransOpts)
142 when TO =:= infinity; is_integer(TO) ->
143 parse_factory_options(Rest, FactoryOpts#factory_opts{connect_timeout = TO}, TransOpts);
144 parse_factory_options([{recv_timeout, TO}|Rest], FactoryOpts, TransOpts)
145 when TO =:= infinity; is_integer(TO) ->
146 parse_factory_options(Rest, FactoryOpts, [{recv_timeout, TO}] ++ TransOpts).
147
148
149 %% Generates a "transport factory" function - a fun which returns a thrift_transport()
150 %% instance.
151 %% State can be passed into a protocol factory to generate a connection to a
152 %% thrift server over a socket.
153 new_transport_factory(Host, Port, Options) ->
154 {FactoryOpts, TransOpts} = parse_factory_options(Options, #factory_opts{}, []),
155 {ok, fun() -> SockOpts = [binary,
156 {packet, 0},
157 {active, false},
158 {nodelay, true}|FactoryOpts#factory_opts.sockopts
159 ],
160 case catch gen_tcp:connect(
161 Host,
162 Port,
163 SockOpts,
164 FactoryOpts#factory_opts.connect_timeout
165 ) of
166 {ok, Sock} ->
167 {ok, Transport} = thrift_socket_transport:new(Sock, TransOpts),
168 {ok, BufTransport} = case FactoryOpts#factory_opts.framed of
169 true -> thrift_framed_transport:new(Transport);
170 false -> thrift_buffered_transport:new(Transport)
171 end,
172 {ok, BufTransport};
173 Error -> Error
174 end
175 end}.
176