]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | %% |
2 | %% Licensed to the Apache Software Foundation (ASF) under one | |
3 | %% or more contributor license agreements. See the NOTICE file | |
4 | %% distributed with this work for additional information | |
5 | %% regarding copyright ownership. The ASF licenses this file | |
6 | %% to you under the Apache License, Version 2.0 (the | |
7 | %% "License"); you may not use this file except in compliance | |
8 | %% with the License. You may obtain a copy of the License at | |
9 | %% | |
10 | %% http://www.apache.org/licenses/LICENSE-2.0 | |
11 | %% | |
12 | %% Unless required by applicable law or agreed to in writing, | |
13 | %% software distributed under the License is distributed on an | |
14 | %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | %% KIND, either express or implied. See the License for the | |
16 | %% specific language governing permissions and limitations | |
17 | %% under the License. | |
18 | %% | |
19 | ||
20 | -module(thrift_socket_transport). | |
21 | ||
22 | -behaviour(thrift_transport). | |
23 | ||
24 | %% constructors | |
25 | -export([new/1, new/2]). | |
26 | %% transport callbacks | |
27 | -export([read/2, read_exact/2, write/2, flush/1, close/1]). | |
28 | %% legacy api | |
29 | -export([new_transport_factory/3]). | |
30 | ||
31 | ||
32 | -record(t_socket, { | |
33 | socket, | |
34 | recv_timeout=60000, | |
35 | buffer = [] | |
36 | }). | |
37 | ||
38 | -type state() :: #t_socket{}. | |
39 | ||
40 | ||
41 | -spec new(Socket::any()) -> | |
42 | thrift_transport:t_transport(). | |
43 | ||
44 | new(Socket) -> new(Socket, []). | |
45 | ||
46 | -spec new(Socket::any(), Opts::list()) -> | |
47 | thrift_transport:t_transport(). | |
48 | ||
49 | new(Socket, Opts) when is_list(Opts) -> | |
50 | State = parse_opts(Opts, #t_socket{socket = Socket}), | |
51 | thrift_transport:new(?MODULE, State). | |
52 | ||
53 | ||
54 | parse_opts([{recv_timeout, Timeout}|Rest], State) | |
55 | when is_integer(Timeout), Timeout > 0 -> | |
56 | parse_opts(Rest, State#t_socket{recv_timeout = Timeout}); | |
57 | parse_opts([{recv_timeout, infinity}|Rest], State) -> | |
58 | parse_opts(Rest, State#t_socket{recv_timeout = infinity}); | |
59 | parse_opts([], State) -> | |
60 | State. | |
61 | ||
62 | ||
63 | -include("thrift_transport_behaviour.hrl"). | |
64 | ||
65 | ||
66 | read(State = #t_socket{buffer = Buf}, Len) | |
67 | when is_integer(Len), Len >= 0 -> | |
68 | Binary = iolist_to_binary(Buf), | |
69 | case iolist_size(Binary) of | |
70 | X when X >= Len -> | |
71 | {Result, Remaining} = split_binary(Binary, Len), | |
72 | {State#t_socket{buffer = Remaining}, {ok, Result}}; | |
73 | _ -> recv(State, Len) | |
74 | end. | |
75 | ||
76 | recv(State = #t_socket{socket = Socket, buffer = Buf}, Len) -> | |
77 | case gen_tcp:recv(Socket, 0, State#t_socket.recv_timeout) of | |
78 | {error, Error} -> | |
79 | gen_tcp:close(Socket), | |
80 | {State, {error, Error}}; | |
81 | {ok, Data} -> | |
82 | Binary = iolist_to_binary([Buf, Data]), | |
83 | Give = min(iolist_size(Binary), Len), | |
84 | {Result, Remaining} = split_binary(Binary, Give), | |
85 | {State#t_socket{buffer = Remaining}, {ok, Result}} | |
86 | end. | |
87 | ||
88 | ||
89 | read_exact(State = #t_socket{buffer = Buf}, Len) | |
90 | when is_integer(Len), Len >= 0 -> | |
91 | Binary = iolist_to_binary(Buf), | |
92 | case iolist_size(Binary) of | |
93 | X when X >= Len -> read(State, Len); | |
94 | X -> | |
95 | case gen_tcp:recv(State#t_socket.socket, Len - X, State#t_socket.recv_timeout) of | |
96 | {error, Error} -> | |
97 | gen_tcp:close(State#t_socket.socket), | |
98 | {State, {error, Error}}; | |
99 | {ok, Data} -> | |
100 | {State#t_socket{buffer = []}, {ok, <<Binary/binary, Data/binary>>}} | |
101 | end | |
102 | end. | |
103 | ||
104 | ||
105 | write(State = #t_socket{socket = Socket}, Data) -> | |
106 | case gen_tcp:send(Socket, Data) of | |
107 | {error, Error} -> | |
108 | gen_tcp:close(Socket), | |
109 | {State, {error, Error}}; | |
110 | ok -> {State, ok} | |
111 | end. | |
112 | ||
113 | ||
114 | flush(State) -> | |
115 | {State#t_socket{buffer = []}, ok}. | |
116 | ||
117 | ||
118 | close(State = #t_socket{socket = Socket}) -> | |
119 | {State, gen_tcp:close(Socket)}. | |
120 | ||
121 | ||
122 | %% legacy api. left for compatibility | |
123 | ||
124 | %% The following "local" record is filled in by parse_factory_options/2 | |
125 | %% below. These options can be passed to new_protocol_factory/3 in a | |
126 | %% proplists-style option list. They're parsed like this so it is an O(n) | |
127 | %% operation instead of O(n^2) | |
128 | -record(factory_opts, { | |
129 | connect_timeout = infinity, | |
130 | sockopts = [], | |
131 | framed = false | |
132 | }). | |
133 | ||
134 | parse_factory_options([], FactoryOpts, TransOpts) -> {FactoryOpts, TransOpts}; | |
135 | parse_factory_options([{framed, Bool}|Rest], FactoryOpts, TransOpts) | |
136 | when is_boolean(Bool) -> | |
137 | parse_factory_options(Rest, FactoryOpts#factory_opts{framed = Bool}, TransOpts); | |
138 | parse_factory_options([{sockopts, OptList}|Rest], FactoryOpts, TransOpts) | |
139 | when is_list(OptList) -> | |
140 | parse_factory_options(Rest, FactoryOpts#factory_opts{sockopts = OptList}, TransOpts); | |
141 | parse_factory_options([{connect_timeout, TO}|Rest], FactoryOpts, TransOpts) | |
142 | when TO =:= infinity; is_integer(TO) -> | |
143 | parse_factory_options(Rest, FactoryOpts#factory_opts{connect_timeout = TO}, TransOpts); | |
144 | parse_factory_options([{recv_timeout, TO}|Rest], FactoryOpts, TransOpts) | |
145 | when TO =:= infinity; is_integer(TO) -> | |
146 | parse_factory_options(Rest, FactoryOpts, [{recv_timeout, TO}] ++ TransOpts). | |
147 | ||
148 | ||
149 | %% Generates a "transport factory" function - a fun which returns a thrift_transport() | |
150 | %% instance. | |
151 | %% State can be passed into a protocol factory to generate a connection to a | |
152 | %% thrift server over a socket. | |
153 | new_transport_factory(Host, Port, Options) -> | |
154 | {FactoryOpts, TransOpts} = parse_factory_options(Options, #factory_opts{}, []), | |
155 | {ok, fun() -> SockOpts = [binary, | |
156 | {packet, 0}, | |
157 | {active, false}, | |
158 | {nodelay, true}|FactoryOpts#factory_opts.sockopts | |
159 | ], | |
160 | case catch gen_tcp:connect( | |
161 | Host, | |
162 | Port, | |
163 | SockOpts, | |
164 | FactoryOpts#factory_opts.connect_timeout | |
165 | ) of | |
166 | {ok, Sock} -> | |
167 | {ok, Transport} = thrift_socket_transport:new(Sock, TransOpts), | |
168 | {ok, BufTransport} = case FactoryOpts#factory_opts.framed of | |
169 | true -> thrift_framed_transport:new(Transport); | |
170 | false -> thrift_buffered_transport:new(Transport) | |
171 | end, | |
172 | {ok, BufTransport}; | |
173 | Error -> Error | |
174 | end | |
175 | end}. | |
176 |