]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/d/src/thrift/codegen/client_pool.d
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / d / src / thrift / codegen / client_pool.d
CommitLineData
f67539c2
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19module thrift.codegen.client_pool;
20
21import core.time : dur, Duration, TickDuration;
22import std.traits : ParameterTypeTuple, ReturnType;
23import thrift.base;
24import thrift.codegen.base;
25import thrift.codegen.client;
26import thrift.internal.codegen;
27import thrift.internal.resource_pool;
28
29/**
30 * Manages a pool of TClients for the given interface, forwarding RPC calls to
31 * members of the pool.
32 *
33 * If a request fails, another client from the pool is tried, and optionally,
34 * a client is disabled for a configurable amount of time if it fails too
35 * often. If all clients fail (and keepTrying is false), a
36 * TCompoundOperationException is thrown, containing all the collected RPC
37 * exceptions.
38 */
39class TClientPool(Interface) if (isService!Interface) : Interface {
40 /// Shorthand for TClientBase!Interface, the client type this instance
41 /// operates on.
42 alias TClientBase!Interface Client;
43
44 /**
45 * Creates a new instance and adds the given clients to the pool.
46 */
47 this(Client[] clients) {
48 pool_ = new TResourcePool!Client(clients);
49
50 rpcFaultFilter = (Exception e) {
51 import thrift.protocol.base;
52 import thrift.transport.base;
53 return (
54 (cast(TTransportException)e !is null) ||
55 (cast(TApplicationException)e !is null)
56 );
57 };
58 }
59
60 /**
61 * Executes an operation on the first currently active client.
62 *
63 * If the operation fails (throws an exception for which rpcFaultFilter is
64 * true), the failure is recorded and the next client in the pool is tried.
65 *
66 * Throws: Any non-rpc exception that occurs, a TCompoundOperationException
67 * if all clients failed with an rpc exception (if keepTrying is false).
68 *
69 * Example:
70 * ---
71 * interface Foo { string bar(); }
72 * auto poolClient = tClientPool([tClient!Foo(someProtocol)]);
73 * auto result = poolClient.execute((c){ return c.bar(); });
74 * ---
75 */
76 ResultType execute(ResultType)(scope ResultType delegate(Client) work) {
77 return executeOnPool!Client(work);
78 }
79
80 /**
81 * Adds a client to the pool.
82 */
83 void addClient(Client client) {
84 pool_.add(client);
85 }
86
87 /**
88 * Removes a client from the pool.
89 *
90 * Returns: Whether the client was found in the pool.
91 */
92 bool removeClient(Client client) {
93 return pool_.remove(client);
94 }
95
96 mixin(poolForwardCode!Interface());
97
98 /// Whether to open the underlying transports of a client before trying to
99 /// execute a method if they are not open. This is usually desirable
100 /// because it allows e.g. to automatically reconnect to a remote server
101 /// if the network connection is dropped.
102 ///
103 /// Defaults to true.
104 bool reopenTransports = true;
105
106 /// Called to determine whether an exception comes from a client from the
107 /// pool not working properly, or if it an exception thrown at the
108 /// application level.
109 ///
110 /// If the delegate returns true, the server/connection is considered to be
111 /// at fault, if it returns false, the exception is just passed on to the
112 /// caller.
113 ///
114 /// By default, returns true for instances of TTransportException and
115 /// TApplicationException, false otherwise.
116 bool delegate(Exception) rpcFaultFilter;
117
118 /**
119 * Whether to keep trying to find a working client if all have failed in a
120 * row.
121 *
122 * Defaults to false.
123 */
124 bool keepTrying() const @property {
125 return pool_.cycle;
126 }
127
128 /// Ditto
129 void keepTrying(bool value) @property {
130 pool_.cycle = value;
131 }
132
133 /**
134 * Whether to use a random permutation of the client pool on every call to
135 * execute(). This can be used e.g. as a simple form of load balancing.
136 *
137 * Defaults to true.
138 */
139 bool permuteClients() const @property {
140 return pool_.permute;
141 }
142
143 /// Ditto
144 void permuteClients(bool value) @property {
145 pool_.permute = value;
146 }
147
148 /**
149 * The number of consecutive faults after which a client is disabled until
150 * faultDisableDuration has passed. 0 to never disable clients.
151 *
152 * Defaults to 0.
153 */
154 ushort faultDisableCount() @property {
155 return pool_.faultDisableCount;
156 }
157
158 /// Ditto
159 void faultDisableCount(ushort value) @property {
160 pool_.faultDisableCount = value;
161 }
162
163 /**
164 * The duration for which a client is no longer considered after it has
165 * failed too often.
166 *
167 * Defaults to one second.
168 */
169 Duration faultDisableDuration() @property {
170 return pool_.faultDisableDuration;
171 }
172
173 /// Ditto
174 void faultDisableDuration(Duration value) @property {
175 pool_.faultDisableDuration = value;
176 }
177
178protected:
179 ResultType executeOnPool(ResultType)(scope ResultType delegate(Client) work) {
180 auto clients = pool_[];
181 if (clients.empty) {
182 throw new TException("No clients available to try.");
183 }
184
185 while (true) {
186 Exception[] rpcExceptions;
187 while (!clients.empty) {
188 auto c = clients.front;
189 clients.popFront;
190 try {
191 scope (success) {
192 pool_.recordSuccess(c);
193 }
194
195 if (reopenTransports) {
196 c.inputProtocol.transport.open();
197 c.outputProtocol.transport.open();
198 }
199
200 return work(c);
201 } catch (Exception e) {
202 if (rpcFaultFilter && rpcFaultFilter(e)) {
203 pool_.recordFault(c);
204 rpcExceptions ~= e;
205 } else {
206 // We are dealing with a normal exception thrown by the
207 // server-side method, just pass it on. As far as we are
208 // concerned, the method call succeeded.
209 pool_.recordSuccess(c);
210 throw e;
211 }
212 }
213 }
214
215 // If we get here, no client succeeded during the current iteration.
216 Duration waitTime;
217 Client dummy;
218 if (clients.willBecomeNonempty(dummy, waitTime)) {
219 if (waitTime > dur!"hnsecs"(0)) {
220 import core.thread;
221 Thread.sleep(waitTime);
222 }
223 } else {
224 throw new TCompoundOperationException("All clients failed.",
225 rpcExceptions);
226 }
227 }
228 }
229
230private:
231 TResourcePool!Client pool_;
232}
233
234private {
235 // Cannot use an anonymous delegate literal for this because they aren't
236 // allowed in class scope.
237 string poolForwardCode(Interface)() {
238 string code = "";
239
240 foreach (methodName; AllMemberMethodNames!Interface) {
241 enum qn = "Interface." ~ methodName;
242 code ~= "ReturnType!(" ~ qn ~ ") " ~ methodName ~
243 "(ParameterTypeTuple!(" ~ qn ~ ") args) {\n";
244 code ~= "return executeOnPool((Client c){ return c." ~
245 methodName ~ "(args); });\n";
246 code ~= "}\n";
247 }
248
249 return code;
250 }
251}
252
253/**
254 * TClientPool construction helper to avoid having to explicitly specify
255 * the interface type, i.e. to allow the constructor being called using IFTI
256 * (see $(DMDBUG 6082, D Bugzilla enhancement requet 6082)).
257 */
258TClientPool!Interface tClientPool(Interface)(
259 TClientBase!Interface[] clients
260) if (isService!Interface) {
261 return new typeof(return)(clients);
262}