]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/c_glib/src/thrift/c_glib/processor/thrift_multiplexed_processor.c
68a0f4d46cb6713695de564b1752314faae91ecc
[ceph.git] / ceph / src / jaegertracing / thrift / lib / c_glib / src / thrift / c_glib / processor / thrift_multiplexed_processor.c
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 #include <string.h>
20 #include <stdio.h>
21
22 #include <thrift/c_glib/thrift.h>
23 #include <thrift/c_glib/processor/thrift_processor.h>
24 #include <thrift/c_glib/processor/thrift_multiplexed_processor.h>
25 #include <thrift/c_glib/protocol/thrift_multiplexed_protocol.h>
26 #include <thrift/c_glib/protocol/thrift_stored_message_protocol.h>
27 #include <thrift/c_glib/thrift_application_exception.h>
28
29 G_DEFINE_TYPE(ThriftMultiplexedProcessor, thrift_multiplexed_processor, THRIFT_TYPE_PROCESSOR)
30
31
32 enum
33 {
34 PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME = 1,
35 PROP_THRIFT_MULTIPLEXED_PROCESSOR_END
36 };
37
38 static GParamSpec *thrift_multiplexed_processor_obj_properties[PROP_THRIFT_MULTIPLEXED_PROCESSOR_END] = { NULL, };
39
40
41 static gboolean
42 thrift_multiplexed_processor_register_processor_impl(ThriftProcessor *processor, const gchar * multiplexed_processor_name, ThriftProcessor * multiplexed_processor , GError **error)
43 {
44 ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR(processor);
45 g_hash_table_replace(self->multiplexed_services,
46 g_strdup(multiplexed_processor_name),
47 g_object_ref (multiplexed_processor));
48
49 /* Make first registered become default */
50 if(!self->default_processor_name){
51 self->default_processor_name = g_strdup(multiplexed_processor_name);
52 }
53 return TRUE;
54 }
55
56
57 static gboolean
58 thrift_multiplexed_processor_process_impl (ThriftProcessor *processor, ThriftProtocol *in,
59 ThriftProtocol *out, GError **error)
60 {
61 gboolean retval = FALSE;
62 gboolean token_error = FALSE;
63 ThriftApplicationException *xception;
64 ThriftStoredMessageProtocol *stored_message_protocol = NULL;
65 ThriftMessageType message_type;
66 ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR(processor);
67 ThriftProcessor *multiplexed_processor = NULL;
68 ThriftTransport *transport;
69 char *token=NULL;
70 int token_index=0;
71 char *state=NULL;
72 gchar *fname=NULL;
73 gint32 seqid, result;
74
75 /* FIXME It seems that previous processor is not managing error correctly */
76 if(*error!=NULL) {
77 g_debug ("thrift_multiplexed_processor: last error not removed: %s",
78 *error != NULL ? (*error)->message : "(null)");
79 g_clear_error (error);
80 }
81
82
83 THRIFT_PROTOCOL_GET_CLASS(in)->read_message_begin(in, &fname, &message_type, &seqid, error);
84
85 if(!(message_type == T_CALL || message_type == T_ONEWAY)) {
86 g_set_error (error,
87 THRIFT_MULTIPLEXED_PROCESSOR_ERROR,
88 THRIFT_MULTIPLEXED_PROCESSOR_ERROR_MESSAGE_TYPE,
89 "message type invalid for this processor");
90 }else{
91 /* Split by the token */
92 for (token = strtok_r(fname, THRIFT_MULTIPLEXED_PROTOCOL_DEFAULT_SEPARATOR, &state),
93 token_index=0;
94 token != NULL && !token_error;
95 token = strtok_r(NULL, THRIFT_MULTIPLEXED_PROTOCOL_DEFAULT_SEPARATOR, &state),
96 token_index++)
97 {
98 switch(token_index){
99 case 0:
100 /* It should be the service name */
101 multiplexed_processor = g_hash_table_lookup(self->multiplexed_services, token);
102 if(multiplexed_processor==NULL){
103 token_error=TRUE;
104 }
105 break;
106 case 1:
107 /* It should be the function name */
108 stored_message_protocol = g_object_new (THRIFT_TYPE_STORED_MESSAGE_PROTOCOL,
109 "protocol", in,
110 "name", token,
111 "type", message_type,
112 "seqid", seqid,
113 NULL);
114 break;
115 default:
116 g_set_error (error,
117 THRIFT_MULTIPLEXED_PROCESSOR_ERROR,
118 THRIFT_MULTIPLEXED_PROCESSOR_ERROR_MESSAGE_WRONGLY_MULTIPLEXED,
119 "the message has more tokens than expected!");
120 token_error=TRUE;
121 break;
122 }
123 }
124 /* Set default */
125 if(!stored_message_protocol &&
126 !multiplexed_processor &&
127 token_index==1 && self->default_processor_name){
128 /* It should be the service name */
129 multiplexed_processor = g_hash_table_lookup(self->multiplexed_services, self->default_processor_name);
130 if(multiplexed_processor==NULL){
131 g_set_error (error,
132 THRIFT_MULTIPLEXED_PROCESSOR_ERROR,
133 THRIFT_MULTIPLEXED_PROCESSOR_ERROR_SERVICE_UNAVAILABLE,
134 "service %s not available on this processor",
135 self->default_processor_name);
136 }else{
137 /* Set the message name to the original name */
138 stored_message_protocol = g_object_new (THRIFT_TYPE_STORED_MESSAGE_PROTOCOL,
139 "protocol", in,
140 "name", fname,
141 "type", message_type,
142 "seqid", seqid,
143 NULL);
144 }
145
146 }
147
148 if(stored_message_protocol!=NULL && multiplexed_processor!=NULL){
149 retval = THRIFT_PROCESSOR_GET_CLASS (multiplexed_processor)->process (multiplexed_processor, (ThriftProtocol *) stored_message_protocol, out, error) ;
150 }else{
151 if(!error)
152 g_set_error (error,
153 THRIFT_MULTIPLEXED_PROCESSOR_ERROR,
154 THRIFT_MULTIPLEXED_PROCESSOR_ERROR_SERVICE_UNAVAILABLE,
155 "service %s is not multiplexed in this processor",
156 fname);
157 }
158
159
160 }
161
162 if(!retval){
163 /* By default, return an application exception to the client indicating the
164 method name is not recognized. */
165 /* Copied from dispach processor */
166
167 if ((thrift_protocol_skip (in, T_STRUCT, error) < 0) ||
168 (thrift_protocol_read_message_end (in, error) < 0))
169 return retval;
170
171 g_object_get (in, "transport", &transport, NULL);
172 result = thrift_transport_read_end (transport, error);
173 g_object_unref (transport);
174 if (result < 0) {
175 /* We must free fname */
176 g_free(fname);
177 return retval;
178 }
179
180 if (thrift_protocol_write_message_begin (out,
181 fname,
182 T_EXCEPTION,
183 seqid,
184 error) < 0){
185 /* We must free fname */
186 g_free(fname);
187
188 return retval;
189 }
190
191
192 xception =
193 g_object_new (THRIFT_TYPE_APPLICATION_EXCEPTION,
194 "type", THRIFT_APPLICATION_EXCEPTION_ERROR_UNKNOWN_METHOD,
195 "message", (*error)->message,
196 NULL);
197 result = thrift_struct_write (THRIFT_STRUCT (xception),
198 out,
199 error);
200 g_object_unref (xception);
201 if ((result < 0) ||
202 (thrift_protocol_write_message_end (out, error) < 0))
203 return retval;
204
205 g_object_get (out, "transport", &transport, NULL);
206 retval =
207 ((thrift_transport_write_end (transport, error) >= 0) &&
208 (thrift_transport_flush (transport, error) >= 0));
209 g_object_unref (transport);
210 }else{
211 /* The protocol now has a copy we can free it */
212 g_free(fname);
213
214 }
215
216 /*
217 FIXME This makes everything fail, I don't know why.
218 if(stored_message_protocol!=NULL){
219 // After its use we must free it
220 g_object_unref(stored_message_protocol);
221 }
222 */
223 return retval;
224 }
225
226 /* define the GError domain for Thrift transports */
227 GQuark
228 thrift_multiplexed_processor_error_quark (void)
229 {
230 return g_quark_from_static_string (THRIFT_MULTIPLEXED_PROCESSOR_ERROR_DOMAIN);
231 }
232
233
234 static void
235 thrift_multiplexed_processor_set_property (GObject *object,
236 guint property_id,
237 const GValue *value,
238 GParamSpec *pspec)
239 {
240 ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR (object);
241
242 switch (property_id)
243 {
244 case PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME:
245 self->default_processor_name = g_value_dup_string (value);
246 break;
247
248 default:
249 /* We don't have any other property... */
250 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
251 break;
252 }
253 }
254
255 static void
256 thrift_multiplexed_processor_get_property (GObject *object,
257 guint property_id,
258 GValue *value,
259 GParamSpec *pspec)
260 {
261 ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR (object);
262
263 switch (property_id)
264 {
265 case PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME:
266 g_value_set_string (value, self->default_processor_name);
267 break;
268
269 default:
270 /* We don't have any other property... */
271 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
272 break;
273 }
274 }
275
276 /* destructor */
277 static void
278 thrift_multiplexed_processor_finalize (GObject *object)
279 {
280 ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR(object);
281
282 /* Free our multiplexed hash table */
283 g_hash_table_unref (self->multiplexed_services);
284 self->multiplexed_services = NULL;
285
286 if(self->default_processor_name){
287 g_free(self->default_processor_name);
288 self->default_processor_name=NULL;
289 }
290
291 /* Chain up to parent */
292 if (G_OBJECT_CLASS (thrift_multiplexed_processor_parent_class)->finalize)
293 (*G_OBJECT_CLASS (thrift_multiplexed_processor_parent_class)->finalize) (object);
294 }
295
296 /* class initializer for ThriftMultiplexedProcessor */
297 static void
298 thrift_multiplexed_processor_class_init (ThriftMultiplexedProcessorClass *cls)
299 {
300 /* Override */
301 THRIFT_PROCESSOR_CLASS(cls)->process = thrift_multiplexed_processor_process_impl;
302 GObjectClass *gobject_class = G_OBJECT_CLASS (cls);
303
304 /* Object methods */
305 gobject_class->set_property = thrift_multiplexed_processor_set_property;
306 gobject_class->get_property = thrift_multiplexed_processor_get_property;
307 gobject_class->finalize = thrift_multiplexed_processor_finalize;
308
309 /* Class methods */
310 cls->register_processor = thrift_multiplexed_processor_register_processor_impl;
311
312
313 thrift_multiplexed_processor_obj_properties[PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME] =
314 g_param_spec_string ("default",
315 "Default service name the protocol points to where no multiplexed client used",
316 "Set the default service name",
317 NULL,
318 (G_PARAM_READWRITE));
319
320 g_object_class_install_properties (gobject_class,
321 PROP_THRIFT_MULTIPLEXED_PROCESSOR_END,
322 thrift_multiplexed_processor_obj_properties);
323
324 }
325
326 static void
327 thrift_multiplexed_processor_init (ThriftMultiplexedProcessor *self)
328 {
329
330 /* Create our multiplexed services hash table */
331 self->multiplexed_services = g_hash_table_new_full (
332 g_str_hash,
333 g_str_equal,
334 g_free,
335 g_object_unref);
336 self->default_processor_name = NULL;
337 }
338
339
340 gboolean
341 thrift_multiplexed_processor_register_processor(ThriftProcessor *processor, const gchar * multiplexed_processor_name, ThriftProcessor * multiplexed_processor , GError **error)
342 {
343 return THRIFT_MULTIPLEXED_PROCESSOR_GET_CLASS(processor)->register_processor(processor, multiplexed_processor_name, multiplexed_processor, error);
344 }
345
346