2 %% Licensed to the Apache Software Foundation (ASF) under one
3 %% or more contributor license agreements. See the NOTICE file
4 %% distributed with this work for additional information
5 %% regarding copyright ownership. The ASF licenses this file
6 %% to you under the Apache License, Version 2.0 (the
7 %% "License"); you may not use this file except in compliance
8 %% with the License. You may obtain a copy of the License at
10 %% http://www.apache.org/licenses/LICENSE-2.0
12 %% Unless required by applicable law or agreed to in writing,
13 %% software distributed under the License is distributed on an
14 %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 %% KIND, either express or implied. See the License for the
16 %% specific language governing permissions and limitations
20 -module(thrift_reconnecting_client).
22 -behaviour(gen_server).
27 get_and_reset_stats/1 ]).
29 -export([ start_link/6 ]).
31 %% gen_server callbacks
39 -record( state, { client = nil,
50 %%====================================================================
52 %%====================================================================
53 %%--------------------------------------------------------------------
54 %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
55 %% Description: Starts the server
56 %%--------------------------------------------------------------------
57 start_link( Host, Port,
58 ThriftSvc, ThriftOpts,
59 ReconnMin, ReconnMax ) ->
60 gen_server:start_link( ?MODULE,
62 ThriftSvc, ThriftOpts,
63 ReconnMin, ReconnMax ],
66 call( Pid, Op, Args ) ->
67 gen_server:call( Pid, { call, Op, Args } ).
70 gen_server:call( Pid, get_stats ).
72 get_and_reset_stats( Pid ) ->
73 gen_server:call( Pid, get_and_reset_stats ).
75 %%====================================================================
76 %% gen_server callbacks
77 %%====================================================================
79 %%--------------------------------------------------------------------
80 %% Function: init(Args) -> {ok, State} |
81 %% {ok, State, Timeout} |
84 %% Description: Start the server.
85 %%--------------------------------------------------------------------
86 init( [ Host, Port, TSvc, TOpts, ReconnMin, ReconnMax ] ) ->
87 process_flag( trap_exit, true ),
89 State = #state{ host = Host,
93 reconn_min = ReconnMin,
94 reconn_max = ReconnMax,
95 op_cnt_dict = dict:new(),
96 op_time_dict = dict:new() },
98 { ok, try_connect( State ) }.
100 %%--------------------------------------------------------------------
101 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
102 %% {reply, Reply, State, Timeout} |
103 %% {noreply, State} |
104 %% {noreply, State, Timeout} |
105 %% {stop, Reason, Reply, State} |
106 %% {stop, Reason, State}
107 %% Description: Handling call messages
108 %%--------------------------------------------------------------------
109 handle_call( { call, Op, _ },
111 State = #state{ client = nil } ) ->
112 { reply, { error, noconn }, incr_stats( Op, "failfast", 1, State ) };
114 handle_call( { call, Op, Args },
116 State=#state{ client = Client } ) ->
119 Result = ( catch thrift_client:call( Client, Op, Args) ),
123 { C, { ok, Reply } } ->
124 S = incr_stats( Op, "success", Time, State#state{ client = C } ),
125 { reply, {ok, Reply }, S };
126 { _, { E, Msg } } when E == error; E == exception ->
127 S = incr_stats( Op, "error", Time, try_connect( State ) ),
128 { reply, { E, Msg }, S };
130 S = incr_stats( Op, "error", Time, try_connect( State ) ),
134 handle_call( get_stats,
136 State = #state{} ) ->
137 { reply, stats( State ), State };
139 handle_call( get_and_reset_stats,
141 State = #state{} ) ->
142 { reply, stats( State ), reset_stats( State ) }.
144 %%--------------------------------------------------------------------
145 %% Function: handle_cast(Msg, State) -> {noreply, State} |
146 %% {noreply, State, Timeout} |
147 %% {stop, Reason, State}
148 %% Description: Handling cast messages
149 %%--------------------------------------------------------------------
150 handle_cast( _Msg, State ) ->
153 %%--------------------------------------------------------------------
154 %% Function: handle_info(Info, State) -> {noreply, State} |
155 %% {noreply, State, Timeout} |
156 %% {stop, Reason, State}
157 %% Description: Handling all non call/cast messages
158 %%--------------------------------------------------------------------
159 handle_info( try_connect, State ) ->
160 { noreply, try_connect( State ) };
162 handle_info( _Info, State ) ->
165 %%--------------------------------------------------------------------
166 %% Function: terminate(Reason, State) -> void()
167 %% Description: This function is called by a gen_server when it is about to
168 %% terminate. It should be the opposite of Module:init/1 and do any necessary
169 %% cleaning up. When it returns, the gen_server terminates with Reason.
170 %% The return value is ignored.
171 %%--------------------------------------------------------------------
172 terminate( _Reason, #state{ client = Client } ) ->
173 thrift_client:close( Client ),
176 %%--------------------------------------------------------------------
177 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
178 %% Description: Convert process state when code is changed
179 %%--------------------------------------------------------------------
180 code_change( _OldVsn, State, _Extra ) ->
183 %%--------------------------------------------------------------------
184 %%% Internal functions
185 %%--------------------------------------------------------------------
186 try_connect( State = #state{ client = OldClient,
190 thrift_opts = TOpts } ) ->
194 _ -> ( catch thrift_client:close( OldClient ) )
197 case catch thrift_client_util:new( Host, Port, TSvc, TOpts ) of
199 State#state{ client = Client, reconn_time = 0 };
200 { E, Msg } when E == error; E == exception ->
201 ReconnTime = reconn_time( State ),
202 error_logger:error_msg( "[~w] ~w connect failed (~w), trying again in ~w ms~n",
203 [ self(), TSvc, Msg, ReconnTime ] ),
204 erlang:send_after( ReconnTime, self(), try_connect ),
205 State#state{ client = nil, reconn_time = ReconnTime }
209 reconn_time( #state{ reconn_min = ReconnMin, reconn_time = 0 } ) ->
211 reconn_time( #state{ reconn_max = ReconnMax, reconn_time = ReconnMax } ) ->
213 reconn_time( #state{ reconn_max = ReconnMax, reconn_time = R } ) ->
215 case Backoff > ReconnMax of
220 -ifdef(time_correction).
222 T1 = erlang:monotonic_time(),
224 T2 = erlang:monotonic_time(),
225 erlang:convert_time_unit(T2 - T1, native, micro_seconds)
229 T1 = erlang:timestamp(),
231 T2 = erlang:timestamp(),
232 timer:now_diff(T2, T1)
236 incr_stats( Op, Result, Time,
237 State = #state{ op_cnt_dict = OpCntDict,
238 op_time_dict = OpTimeDict } ) ->
239 Key = lists:flatten( [ atom_to_list( Op ), [ "_" | Result ] ] ),
240 State#state{ op_cnt_dict = dict:update_counter( Key, 1, OpCntDict ),
241 op_time_dict = dict:update_counter( Key, Time, OpTimeDict ) }.
244 stats( #state{ thrift_svc = TSvc,
245 op_cnt_dict = OpCntDict,
246 op_time_dict = OpTimeDict } ) ->
247 Svc = atom_to_list(TSvc),
249 F = fun( Key, Count, Stats ) ->
250 Name = lists:flatten( [ Svc, [ "_" | Key ] ] ),
251 Micros = dict:fetch( Key, OpTimeDict ),
252 [ { Name, Count, Micros } | Stats ]
255 dict:fold( F, [], OpCntDict ).
257 reset_stats( State = #state{} ) ->
258 State#state{ op_cnt_dict = dict:new(), op_time_dict = dict:new() }.