2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
22 #include <thrift/c_glib/thrift.h>
23 #include <thrift/c_glib/processor/thrift_processor.h>
24 #include <thrift/c_glib/processor/thrift_multiplexed_processor.h>
25 #include <thrift/c_glib/protocol/thrift_multiplexed_protocol.h>
26 #include <thrift/c_glib/protocol/thrift_stored_message_protocol.h>
27 #include <thrift/c_glib/thrift_application_exception.h>
29 G_DEFINE_TYPE(ThriftMultiplexedProcessor
, thrift_multiplexed_processor
, THRIFT_TYPE_PROCESSOR
)
34 PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME
= 1,
35 PROP_THRIFT_MULTIPLEXED_PROCESSOR_END
38 static GParamSpec
*thrift_multiplexed_processor_obj_properties
[PROP_THRIFT_MULTIPLEXED_PROCESSOR_END
] = { NULL
, };
42 thrift_multiplexed_processor_register_processor_impl(ThriftProcessor
*processor
, const gchar
* multiplexed_processor_name
, ThriftProcessor
* multiplexed_processor
, GError
**error
)
44 ThriftMultiplexedProcessor
*self
= THRIFT_MULTIPLEXED_PROCESSOR(processor
);
45 g_hash_table_replace(self
->multiplexed_services
,
46 g_strdup(multiplexed_processor_name
),
47 g_object_ref (multiplexed_processor
));
49 /* Make first registered become default */
50 if(!self
->default_processor_name
){
51 self
->default_processor_name
= g_strdup(multiplexed_processor_name
);
58 thrift_multiplexed_processor_process_impl (ThriftProcessor
*processor
, ThriftProtocol
*in
,
59 ThriftProtocol
*out
, GError
**error
)
61 gboolean retval
= FALSE
;
62 gboolean token_error
= FALSE
;
63 ThriftApplicationException
*xception
;
64 ThriftStoredMessageProtocol
*stored_message_protocol
= NULL
;
65 ThriftMessageType message_type
;
66 ThriftMultiplexedProcessor
*self
= THRIFT_MULTIPLEXED_PROCESSOR(processor
);
67 ThriftProcessor
*multiplexed_processor
= NULL
;
68 ThriftTransport
*transport
;
75 /* FIXME It seems that previous processor is not managing error correctly */
77 g_debug ("thrift_multiplexed_processor: last error not removed: %s",
78 *error
!= NULL
? (*error
)->message
: "(null)");
79 g_clear_error (error
);
83 THRIFT_PROTOCOL_GET_CLASS(in
)->read_message_begin(in
, &fname
, &message_type
, &seqid
, error
);
85 if(!(message_type
== T_CALL
|| message_type
== T_ONEWAY
)) {
87 THRIFT_MULTIPLEXED_PROCESSOR_ERROR
,
88 THRIFT_MULTIPLEXED_PROCESSOR_ERROR_MESSAGE_TYPE
,
89 "message type invalid for this processor");
91 /* Split by the token */
92 for (token
= strtok_r(fname
, THRIFT_MULTIPLEXED_PROTOCOL_DEFAULT_SEPARATOR
, &state
),
94 token
!= NULL
&& !token_error
;
95 token
= strtok_r(NULL
, THRIFT_MULTIPLEXED_PROTOCOL_DEFAULT_SEPARATOR
, &state
),
100 /* It should be the service name */
101 multiplexed_processor
= g_hash_table_lookup(self
->multiplexed_services
, token
);
102 if(multiplexed_processor
==NULL
){
107 /* It should be the function name */
108 stored_message_protocol
= g_object_new (THRIFT_TYPE_STORED_MESSAGE_PROTOCOL
,
111 "type", message_type
,
117 THRIFT_MULTIPLEXED_PROCESSOR_ERROR
,
118 THRIFT_MULTIPLEXED_PROCESSOR_ERROR_MESSAGE_WRONGLY_MULTIPLEXED
,
119 "the message has more tokens than expected!");
125 if(!stored_message_protocol
&&
126 !multiplexed_processor
&&
127 token_index
==1 && self
->default_processor_name
){
128 /* It should be the service name */
129 multiplexed_processor
= g_hash_table_lookup(self
->multiplexed_services
, self
->default_processor_name
);
130 if(multiplexed_processor
==NULL
){
132 THRIFT_MULTIPLEXED_PROCESSOR_ERROR
,
133 THRIFT_MULTIPLEXED_PROCESSOR_ERROR_SERVICE_UNAVAILABLE
,
134 "service %s not available on this processor",
135 self
->default_processor_name
);
137 /* Set the message name to the original name */
138 stored_message_protocol
= g_object_new (THRIFT_TYPE_STORED_MESSAGE_PROTOCOL
,
141 "type", message_type
,
148 if(stored_message_protocol
!=NULL
&& multiplexed_processor
!=NULL
){
149 retval
= THRIFT_PROCESSOR_GET_CLASS (multiplexed_processor
)->process (multiplexed_processor
, (ThriftProtocol
*) stored_message_protocol
, out
, error
) ;
153 THRIFT_MULTIPLEXED_PROCESSOR_ERROR
,
154 THRIFT_MULTIPLEXED_PROCESSOR_ERROR_SERVICE_UNAVAILABLE
,
155 "service %s is not multiplexed in this processor",
163 /* By default, return an application exception to the client indicating the
164 method name is not recognized. */
165 /* Copied from dispach processor */
167 if ((thrift_protocol_skip (in
, T_STRUCT
, error
) < 0) ||
168 (thrift_protocol_read_message_end (in
, error
) < 0))
171 g_object_get (in
, "transport", &transport
, NULL
);
172 result
= thrift_transport_read_end (transport
, error
);
173 g_object_unref (transport
);
175 /* We must free fname */
180 if (thrift_protocol_write_message_begin (out
,
185 /* We must free fname */
193 g_object_new (THRIFT_TYPE_APPLICATION_EXCEPTION
,
194 "type", THRIFT_APPLICATION_EXCEPTION_ERROR_UNKNOWN_METHOD
,
195 "message", (*error
)->message
,
197 result
= thrift_struct_write (THRIFT_STRUCT (xception
),
200 g_object_unref (xception
);
202 (thrift_protocol_write_message_end (out
, error
) < 0))
205 g_object_get (out
, "transport", &transport
, NULL
);
207 ((thrift_transport_write_end (transport
, error
) >= 0) &&
208 (thrift_transport_flush (transport
, error
) >= 0));
209 g_object_unref (transport
);
211 /* The protocol now has a copy we can free it */
217 FIXME This makes everything fail, I don't know why.
218 if(stored_message_protocol!=NULL){
219 // After its use we must free it
220 g_object_unref(stored_message_protocol);
226 /* define the GError domain for Thrift transports */
228 thrift_multiplexed_processor_error_quark (void)
230 return g_quark_from_static_string (THRIFT_MULTIPLEXED_PROCESSOR_ERROR_DOMAIN
);
235 thrift_multiplexed_processor_set_property (GObject
*object
,
240 ThriftMultiplexedProcessor
*self
= THRIFT_MULTIPLEXED_PROCESSOR (object
);
244 case PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME
:
245 self
->default_processor_name
= g_value_dup_string (value
);
249 /* We don't have any other property... */
250 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, property_id
, pspec
);
256 thrift_multiplexed_processor_get_property (GObject
*object
,
261 ThriftMultiplexedProcessor
*self
= THRIFT_MULTIPLEXED_PROCESSOR (object
);
265 case PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME
:
266 g_value_set_string (value
, self
->default_processor_name
);
270 /* We don't have any other property... */
271 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, property_id
, pspec
);
278 thrift_multiplexed_processor_finalize (GObject
*object
)
280 ThriftMultiplexedProcessor
*self
= THRIFT_MULTIPLEXED_PROCESSOR(object
);
282 /* Free our multiplexed hash table */
283 g_hash_table_unref (self
->multiplexed_services
);
284 self
->multiplexed_services
= NULL
;
286 if(self
->default_processor_name
){
287 g_free(self
->default_processor_name
);
288 self
->default_processor_name
=NULL
;
291 /* Chain up to parent */
292 if (G_OBJECT_CLASS (thrift_multiplexed_processor_parent_class
)->finalize
)
293 (*G_OBJECT_CLASS (thrift_multiplexed_processor_parent_class
)->finalize
) (object
);
296 /* class initializer for ThriftMultiplexedProcessor */
298 thrift_multiplexed_processor_class_init (ThriftMultiplexedProcessorClass
*cls
)
301 THRIFT_PROCESSOR_CLASS(cls
)->process
= thrift_multiplexed_processor_process_impl
;
302 GObjectClass
*gobject_class
= G_OBJECT_CLASS (cls
);
305 gobject_class
->set_property
= thrift_multiplexed_processor_set_property
;
306 gobject_class
->get_property
= thrift_multiplexed_processor_get_property
;
307 gobject_class
->finalize
= thrift_multiplexed_processor_finalize
;
310 cls
->register_processor
= thrift_multiplexed_processor_register_processor_impl
;
313 thrift_multiplexed_processor_obj_properties
[PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME
] =
314 g_param_spec_string ("default",
315 "Default service name the protocol points to where no multiplexed client used",
316 "Set the default service name",
318 (G_PARAM_READWRITE
));
320 g_object_class_install_properties (gobject_class
,
321 PROP_THRIFT_MULTIPLEXED_PROCESSOR_END
,
322 thrift_multiplexed_processor_obj_properties
);
327 thrift_multiplexed_processor_init (ThriftMultiplexedProcessor
*self
)
330 /* Create our multiplexed services hash table */
331 self
->multiplexed_services
= g_hash_table_new_full (
336 self
->default_processor_name
= NULL
;
341 thrift_multiplexed_processor_register_processor(ThriftProcessor
*processor
, const gchar
* multiplexed_processor_name
, ThriftProcessor
* multiplexed_processor
, GError
**error
)
343 return THRIFT_MULTIPLEXED_PROCESSOR_GET_CLASS(processor
)->register_processor(processor
, multiplexed_processor_name
, multiplexed_processor
, error
);