]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/c_glib/arrow-flight-glib/server.cpp
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / c_glib / arrow-flight-glib / server.cpp
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <arrow/util/make_unique.h>
21
22#include <arrow-glib/arrow-glib.hpp>
23
24#include <arrow-flight-glib/common.hpp>
25#include <arrow-flight-glib/server.hpp>
26
27G_BEGIN_DECLS
28
29/**
30 * SECTION: server
31 * @section_id: server
32 * @title: Server related classes
33 * @include: arrow-flight-glib/arrow-flight-glib.h
34 *
35 * #GAFlightDataStream is a class for producing a sequence of IPC
36 * payloads to be sent in `FlightData` protobuf messages. Generally,
37 * this is not used directly. Generally, #GAFlightRecordBatchStream is
38 * used instead.
39 *
40 * #GAFlightRecordBatchStream is a class for producing a sequence of
41 * IPC payloads to be sent in `FlightData` protobuf messages by
42 * #GArrowRecordBatchReader`.
43 *
44 * #GAFlightServerOptions is a class for options of each server.
45 *
46 * #GAFlightServerCallContext is a class for context of each server call.
47 *
48 * #GAFlightServer is a class to develop an Apache Arrow Flight server.
49 *
50 * Since: 5.0.0
51 */
52
53
54typedef struct GAFlightDataStreamPrivate_ {
55 arrow::flight::FlightDataStream *stream;
56} GAFlightDataStreamPrivate;
57
58enum {
59 PROP_STREAM = 1,
60};
61
62G_DEFINE_TYPE_WITH_PRIVATE(GAFlightDataStream,
63 gaflight_data_stream,
64 G_TYPE_OBJECT)
65
66#define GAFLIGHT_DATA_STREAM_GET_PRIVATE(obj) \
67 static_cast<GAFlightDataStreamPrivate *>( \
68 gaflight_data_stream_get_instance_private( \
69 GAFLIGHT_DATA_STREAM(obj)))
70
71static void
72gaflight_data_stream_finalize(GObject *object)
73{
74 auto priv = GAFLIGHT_DATA_STREAM_GET_PRIVATE(object);
75
76 delete priv->stream;
77
78 G_OBJECT_CLASS(gaflight_data_stream_parent_class)->finalize(object);
79}
80
81static void
82gaflight_data_stream_set_property(GObject *object,
83 guint prop_id,
84 const GValue *value,
85 GParamSpec *pspec)
86{
87 auto priv = GAFLIGHT_DATA_STREAM_GET_PRIVATE(object);
88
89 switch (prop_id) {
90 case PROP_STREAM:
91 priv->stream = static_cast<arrow::flight::FlightDataStream *>(
92 g_value_get_pointer(value));
93 break;
94 default:
95 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
96 break;
97 }
98}
99
100static void
101gaflight_data_stream_init(GAFlightDataStream *object)
102{
103}
104
105static void
106gaflight_data_stream_class_init(GAFlightDataStreamClass *klass)
107{
108 auto gobject_class = G_OBJECT_CLASS(klass);
109
110 gobject_class->finalize = gaflight_data_stream_finalize;
111 gobject_class->set_property = gaflight_data_stream_set_property;
112
113 GParamSpec *spec;
114 spec = g_param_spec_pointer("stream",
115 "Stream",
116 "The raw arrow::flight::FlightDataStream *",
117 static_cast<GParamFlags>(G_PARAM_WRITABLE |
118 G_PARAM_CONSTRUCT_ONLY));
119 g_object_class_install_property(gobject_class, PROP_STREAM, spec);
120}
121
122
123typedef struct GAFlightRecordBatchStreamPrivate_ {
124 GArrowRecordBatchReader *reader;
125} GAFlightRecordBatchStreamPrivate;
126
127enum {
128 PROP_READER = 1,
129};
130
131G_DEFINE_TYPE_WITH_PRIVATE(GAFlightRecordBatchStream,
132 gaflight_record_batch_stream,
133 GAFLIGHT_TYPE_DATA_STREAM)
134
135#define GAFLIGHT_RECORD_BATCH_STREAM_GET_PRIVATE(obj) \
136 static_cast<GAFlightRecordBatchStreamPrivate *>( \
137 gaflight_record_batch_stream_get_instance_private( \
138 GAFLIGHT_RECORD_BATCH_STREAM(obj)))
139
140static void
141gaflight_record_batch_stream_dispose(GObject *object)
142{
143 auto priv = GAFLIGHT_RECORD_BATCH_STREAM_GET_PRIVATE(object);
144
145 if (priv->reader) {
146 g_object_unref(priv->reader);
147 priv->reader = NULL;
148 }
149
150 G_OBJECT_CLASS(gaflight_record_batch_stream_parent_class)->dispose(object);
151}
152
153static void
154gaflight_record_batch_stream_set_property(GObject *object,
155 guint prop_id,
156 const GValue *value,
157 GParamSpec *pspec)
158{
159 auto priv = GAFLIGHT_RECORD_BATCH_STREAM_GET_PRIVATE(object);
160
161 switch (prop_id) {
162 case PROP_READER:
163 priv->reader = GARROW_RECORD_BATCH_READER(g_value_dup_object(value));
164 break;
165 default:
166 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
167 break;
168 }
169}
170
171static void
172gaflight_record_batch_stream_get_property(GObject *object,
173 guint prop_id,
174 GValue *value,
175 GParamSpec *pspec)
176{
177 auto priv = GAFLIGHT_RECORD_BATCH_STREAM_GET_PRIVATE(object);
178
179 switch (prop_id) {
180 case PROP_READER:
181 g_value_set_object(value, priv->reader);
182 break;
183 default:
184 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
185 break;
186 }
187}
188
189static void
190gaflight_record_batch_stream_init(GAFlightRecordBatchStream *object)
191{
192}
193
194static void
195gaflight_record_batch_stream_class_init(GAFlightRecordBatchStreamClass *klass)
196{
197 auto gobject_class = G_OBJECT_CLASS(klass);
198
199 gobject_class->dispose = gaflight_record_batch_stream_dispose;
200 gobject_class->set_property = gaflight_record_batch_stream_set_property;
201 gobject_class->get_property = gaflight_record_batch_stream_get_property;
202
203 GParamSpec *spec;
204 /**
205 * GAFlightRecordBatchStream:reader:
206 *
207 * The reader that produces record batches.
208 *
209 * Since: 6.0.0
210 */
211 spec = g_param_spec_object("reader",
212 "Reader",
213 "The reader that produces record batches",
214 GARROW_TYPE_RECORD_BATCH_READER,
215 static_cast<GParamFlags>(G_PARAM_READWRITE |
216 G_PARAM_CONSTRUCT_ONLY));
217 g_object_class_install_property(gobject_class, PROP_READER, spec);
218}
219
220/**
221 * gaflight_record_batch_stream_new:
222 * @reader: A #GArrowRecordBatchReader to be read.
223 * @options: (nullable): A #GArrowWriteOptions for writing record batches to
224 * a client.
225 *
226 * Returns: The newly created #GAFlightRecordBatchStream.
227 *
228 * Since: 6.0.0
229 */
230GAFlightRecordBatchStream *
231gaflight_record_batch_stream_new(GArrowRecordBatchReader *reader,
232 GArrowWriteOptions *options)
233{
234 auto arrow_reader = garrow_record_batch_reader_get_raw(reader);
235 auto arrow_options_default = arrow::ipc::IpcWriteOptions::Defaults();
236 arrow::ipc::IpcWriteOptions *arrow_options = NULL;
237 if (options) {
238 arrow_options = garrow_write_options_get_raw(options);
239 } else {
240 arrow_options = &arrow_options_default;
241 }
242 auto stream = arrow::internal::make_unique<
243 arrow::flight::RecordBatchStream>(arrow_reader, *arrow_options);
244 return static_cast<GAFlightRecordBatchStream *>(
245 g_object_new(GAFLIGHT_TYPE_RECORD_BATCH_STREAM,
246 "stream", stream.release(),
247 "reader", reader,
248 NULL));
249}
250
251
252typedef struct GAFlightServerOptionsPrivate_ {
253 arrow::flight::FlightServerOptions options;
254 GAFlightLocation *location;
255} GAFlightServerOptionsPrivate;
256
257enum {
258 PROP_LOCATION = 1,
259};
260
261G_DEFINE_TYPE_WITH_PRIVATE(GAFlightServerOptions,
262 gaflight_server_options,
263 G_TYPE_OBJECT)
264
265#define GAFLIGHT_SERVER_OPTIONS_GET_PRIVATE(obj) \
266 static_cast<GAFlightServerOptionsPrivate *>( \
267 gaflight_server_options_get_instance_private( \
268 GAFLIGHT_SERVER_OPTIONS(obj)))
269
270static void
271gaflight_server_options_dispose(GObject *object)
272{
273 auto priv = GAFLIGHT_SERVER_OPTIONS_GET_PRIVATE(object);
274
275 if (priv->location) {
276 g_object_unref(priv->location);
277 priv->location = NULL;
278 }
279
280 G_OBJECT_CLASS(gaflight_server_options_parent_class)->dispose(object);
281}
282
283static void
284gaflight_server_options_finalize(GObject *object)
285{
286 auto priv = GAFLIGHT_SERVER_OPTIONS_GET_PRIVATE(object);
287
288 priv->options.~FlightServerOptions();
289
290 G_OBJECT_CLASS(gaflight_server_options_parent_class)->finalize(object);
291}
292
293static void
294gaflight_server_options_set_property(GObject *object,
295 guint prop_id,
296 const GValue *value,
297 GParamSpec *pspec)
298{
299 auto priv = GAFLIGHT_SERVER_OPTIONS_GET_PRIVATE(object);
300
301 switch (prop_id) {
302 case PROP_LOCATION:
303 {
304 priv->location = GAFLIGHT_LOCATION(g_value_dup_object(value));
305 auto flight_location = gaflight_location_get_raw(priv->location);
306 new(&(priv->options)) arrow::flight::FlightServerOptions(*flight_location);
307 }
308 break;
309 default:
310 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
311 break;
312 }
313}
314
315static void
316gaflight_server_options_get_property(GObject *object,
317 guint prop_id,
318 GValue *value,
319 GParamSpec *pspec)
320{
321 auto priv = GAFLIGHT_SERVER_OPTIONS_GET_PRIVATE(object);
322
323 switch (prop_id) {
324 case PROP_LOCATION:
325 g_value_set_object(value, priv->location);
326 break;
327 default:
328 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
329 break;
330 }
331}
332
333static void
334gaflight_server_options_init(GAFlightServerOptions *object)
335{
336}
337
338static void
339gaflight_server_options_class_init(GAFlightServerOptionsClass *klass)
340{
341 auto gobject_class = G_OBJECT_CLASS(klass);
342
343 gobject_class->dispose = gaflight_server_options_dispose;
344 gobject_class->finalize = gaflight_server_options_finalize;
345 gobject_class->set_property = gaflight_server_options_set_property;
346 gobject_class->get_property = gaflight_server_options_get_property;
347
348 GParamSpec *spec;
349 spec = g_param_spec_object("location",
350 "Location",
351 "The location to be listened",
352 GAFLIGHT_TYPE_LOCATION,
353 static_cast<GParamFlags>(G_PARAM_READWRITE |
354 G_PARAM_CONSTRUCT_ONLY));
355 g_object_class_install_property(gobject_class, PROP_LOCATION, spec);
356}
357
358/**
359 * gaflight_server_options_new:
360 * @location: A #GAFlightLocation to be listened.
361 *
362 * Returns: The newly created options for a server.
363 *
364 * Since: 5.0.0
365 */
366GAFlightServerOptions *
367gaflight_server_options_new(GAFlightLocation *location)
368{
369 return static_cast<GAFlightServerOptions *>(
370 g_object_new(GAFLIGHT_TYPE_SERVER_OPTIONS,
371 "location", location,
372 NULL));
373}
374
375
376typedef struct GAFlightServerCallContextPrivate_ {
377 arrow::flight::ServerCallContext *call_context;
378} GAFlightServerCallContextPrivate;
379
380enum {
381 PROP_CALL_CONTEXT = 1,
382};
383
384G_DEFINE_TYPE_WITH_PRIVATE(GAFlightServerCallContext,
385 gaflight_server_call_context,
386 G_TYPE_OBJECT)
387
388#define GAFLIGHT_SERVER_CALL_CONTEXT_GET_PRIVATE(obj) \
389 static_cast<GAFlightServerCallContextPrivate *>( \
390 gaflight_server_call_context_get_instance_private( \
391 GAFLIGHT_SERVER_CALL_CONTEXT(obj)))
392
393static void
394gaflight_server_call_context_set_property(GObject *object,
395 guint prop_id,
396 const GValue *value,
397 GParamSpec *pspec)
398{
399 auto priv = GAFLIGHT_SERVER_CALL_CONTEXT_GET_PRIVATE(object);
400
401 switch (prop_id) {
402 case PROP_CALL_CONTEXT:
403 priv->call_context =
404 static_cast<arrow::flight::ServerCallContext *>(
405 g_value_get_pointer(value));
406 break;
407 default:
408 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
409 break;
410 }
411}
412
413static void
414gaflight_server_call_context_init(GAFlightServerCallContext *object)
415{
416}
417
418static void
419gaflight_server_call_context_class_init(GAFlightServerCallContextClass *klass)
420{
421 auto gobject_class = G_OBJECT_CLASS(klass);
422
423 gobject_class->set_property = gaflight_server_call_context_set_property;
424
425 GParamSpec *spec;
426 spec = g_param_spec_pointer("call-context",
427 "Call context",
428 "The raw arrow::flight::ServerCallContext",
429 static_cast<GParamFlags>(G_PARAM_WRITABLE |
430 G_PARAM_CONSTRUCT_ONLY));
431 g_object_class_install_property(gobject_class, PROP_CALL_CONTEXT, spec);
432}
433
434
435G_END_DECLS
436namespace gaflight {
437 class DataStream : public arrow::flight::FlightDataStream {
438 public:
439 DataStream(GAFlightDataStream *gastream) :
440 arrow::flight::FlightDataStream(),
441 gastream_(gastream) {
442 }
443
444 ~DataStream() override {
445 g_object_unref(gastream_);
446 }
447
448 std::shared_ptr<arrow::Schema> schema() override {
449 auto stream = gaflight_data_stream_get_raw(gastream_);
450 return stream->schema();
451 }
452
453 arrow::Status GetSchemaPayload(
454 arrow::flight::FlightPayload *payload) override {
455 auto stream = gaflight_data_stream_get_raw(gastream_);
456 return stream->GetSchemaPayload(payload);
457 }
458
459 arrow::Status Next(arrow::flight::FlightPayload *payload) override {
460 auto stream = gaflight_data_stream_get_raw(gastream_);
461 return stream->Next(payload);
462 }
463
464 private:
465 GAFlightDataStream *gastream_;
466 };
467
468 class Server : public arrow::flight::FlightServerBase {
469 public:
470 Server(GAFlightServer *gaserver) : gaserver_(gaserver) {
471 }
472
473 arrow::Status
474 ListFlights(
475 const arrow::flight::ServerCallContext &context,
476 const arrow::flight::Criteria *criteria,
477 std::unique_ptr<arrow::flight::FlightListing> *listing) override {
478 auto gacontext = gaflight_server_call_context_new_raw(&context);
479 GAFlightCriteria *gacriteria = NULL;
480 if (criteria) {
481 gacriteria = gaflight_criteria_new_raw(criteria);
482 }
483 GError *gerror = NULL;
484 auto gaflights = gaflight_server_list_flights(gaserver_,
485 gacontext,
486 gacriteria,
487 &gerror);
488 if (gacriteria) {
489 g_object_unref(gacriteria);
490 }
491 g_object_unref(gacontext);
492 if (gerror) {
493 return garrow_error_to_status(gerror,
494 arrow::StatusCode::UnknownError,
495 "[flight-server][list-flights]");
496 }
497 std::vector<arrow::flight::FlightInfo> flights;
498 for (auto node = gaflights; node; node = node->next) {
499 auto gaflight = GAFLIGHT_INFO(node->data);
500 flights.push_back(*gaflight_info_get_raw(gaflight));
501 g_object_unref(gaflight);
502 }
503 g_list_free(gaflights);
504 *listing = arrow::internal::make_unique<
505 arrow::flight::SimpleFlightListing>(flights);
506 return arrow::Status::OK();
507 }
508
509 arrow::Status DoGet(
510 const arrow::flight::ServerCallContext &context,
511 const arrow::flight::Ticket &ticket,
512 std::unique_ptr<arrow::flight::FlightDataStream> *stream) override {
513 auto gacontext = gaflight_server_call_context_new_raw(&context);
514 auto gaticket = gaflight_ticket_new_raw(&ticket);
515 GError *gerror = NULL;
516 auto gastream = gaflight_server_do_get(gaserver_,
517 gacontext,
518 gaticket,
519 &gerror);
520 g_object_unref(gaticket);
521 g_object_unref(gacontext);
522 if (gerror) {
523 return garrow_error_to_status(gerror,
524 arrow::StatusCode::UnknownError,
525 "[flight-server][do-get]");
526 }
527 *stream = arrow::internal::make_unique<DataStream>(gastream);
528 return arrow::Status::OK();
529 }
530
531 private:
532 GAFlightServer *gaserver_;
533 };
534};
535G_BEGIN_DECLS
536
537typedef struct GAFlightServerPrivate_ {
538 gaflight::Server server;
539} GAFlightServerPrivate;
540
541G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE(GAFlightServer,
542 gaflight_server,
543 G_TYPE_OBJECT)
544
545#define GAFLIGHT_SERVER_GET_PRIVATE(obj) \
546 static_cast<GAFlightServerPrivate *>( \
547 gaflight_server_get_instance_private( \
548 GAFLIGHT_SERVER(obj)))
549
550static void
551gaflight_server_finalize(GObject *object)
552{
553 auto priv = GAFLIGHT_SERVER_GET_PRIVATE(object);
554
555 priv->server.~Server();
556
557 G_OBJECT_CLASS(gaflight_server_parent_class)->finalize(object);
558}
559
560static void
561gaflight_server_init(GAFlightServer *object)
562{
563 auto priv = GAFLIGHT_SERVER_GET_PRIVATE(object);
564 new(&(priv->server)) gaflight::Server(object);
565}
566
567static void
568gaflight_server_class_init(GAFlightServerClass *klass)
569{
570 auto gobject_class = G_OBJECT_CLASS(klass);
571
572 gobject_class->finalize = gaflight_server_finalize;
573}
574
575/**
576 * gaflight_server_listen:
577 * @server: A #GAFlightServer.
578 * @options: A #GAFlightServerOptions.
579 * @error: (nullable): Return location for a #GError or %NULL.
580 *
581 * Returns: %TRUE on success, %FALSE on error.
582 *
583 * Since: 5.0.0
584 */
585gboolean
586gaflight_server_listen(GAFlightServer *server,
587 GAFlightServerOptions *options,
588 GError **error)
589{
590 auto flight_server = gaflight_server_get_raw(server);
591 const auto flight_options = gaflight_server_options_get_raw(options);
592 return garrow::check(error,
593 flight_server->Init(*flight_options),
594 "[flight-server][listen]");
595}
596
597/**
598 * gaflight_server_new:
599 * @server: A #GAFlightServer.
600 *
601 * Returns: The port number listening.
602 *
603 * Since: 5.0.0
604 */
605gint
606gaflight_server_get_port(GAFlightServer *server)
607{
608 const auto flight_server = gaflight_server_get_raw(server);
609 return flight_server->port();
610}
611
612/**
613 * gaflight_server_shutdown:
614 * @server: A #GAFlightServer.
615 * @error: (nullable): Return location for a #GError or %NULL.
616 *
617 * Shuts down the serve. This function can be called from signal
618 * handler or another thread.
619 *
620 * Returns: %TRUE on success, %FALSE on error.
621 *
622 * Since: 5.0.0
623 */
624gboolean
625gaflight_server_shutdown(GAFlightServer *server,
626 GError **error)
627{
628 auto flight_server = gaflight_server_get_raw(server);
629 return garrow::check(error,
630 flight_server->Shutdown(),
631 "[flight-server][shutdown]");
632}
633
634/**
635 * gaflight_server_list_flights:
636 * @server: A #GAFlightServer.
637 * @context: A #GAFlightServerCallContext.
638 * @criteria: (nullable): A #GAFlightCriteria.
639 * @error: (nullable): Return location for a #GError or %NULL.
640 *
641 * Returns: (element-type GAFlightInfo) (transfer full):
642 * #GList of #GAFlightInfo on success, %NULL on error.
643 *
644 * Since: 5.0.0
645 */
646GList *
647gaflight_server_list_flights(GAFlightServer *server,
648 GAFlightServerCallContext *context,
649 GAFlightCriteria *criteria,
650 GError **error)
651{
652 auto klass = GAFLIGHT_SERVER_GET_CLASS(server);
653 if (!(klass && klass->list_flights)) {
654 g_set_error(error,
655 GARROW_ERROR,
656 GARROW_ERROR_NOT_IMPLEMENTED,
657 "not implemented");
658 return NULL;
659 }
660 return (*(klass->list_flights))(server, context, criteria, error);
661}
662
663/**
664 * gaflight_server_do_get:
665 * @server: A #GAFlightServer.
666 * @context: A #GAFlightServerCallContext.
667 * @ticket: A #GAFlightTicket.
668 * @error: (nullable): Return location for a #GError or %NULL.
669 *
670 * Returns: (transfer full): #GAFlightDataStream on success, %NULL on error.
671 *
672 * Since: 6.0.0
673 */
674GAFlightDataStream *
675gaflight_server_do_get(GAFlightServer *server,
676 GAFlightServerCallContext *context,
677 GAFlightTicket *ticket,
678 GError **error)
679{
680 auto klass = GAFLIGHT_SERVER_GET_CLASS(server);
681 if (!(klass && klass->do_get)) {
682 g_set_error(error,
683 GARROW_ERROR,
684 GARROW_ERROR_NOT_IMPLEMENTED,
685 "not implemented");
686 return NULL;
687 }
688 return (*(klass->do_get))(server, context, ticket, error);
689}
690
691
692G_END_DECLS
693
694
695arrow::flight::FlightDataStream *
696gaflight_data_stream_get_raw(GAFlightDataStream *stream)
697{
698 auto priv = GAFLIGHT_DATA_STREAM_GET_PRIVATE(stream);
699 return priv->stream;
700}
701
702arrow::flight::FlightServerOptions *
703gaflight_server_options_get_raw(GAFlightServerOptions *options)
704{
705 auto priv = GAFLIGHT_SERVER_OPTIONS_GET_PRIVATE(options);
706 return &(priv->options);
707}
708
709GAFlightServerCallContext *
710gaflight_server_call_context_new_raw(
711 const arrow::flight::ServerCallContext *call_context)
712{
713 return GAFLIGHT_SERVER_CALL_CONTEXT(
714 g_object_new(GAFLIGHT_TYPE_SERVER_CALL_CONTEXT,
715 "call-context", call_context,
716 NULL));
717}
718
719arrow::flight::FlightServerBase *
720gaflight_server_get_raw(GAFlightServer *server)
721{
722 auto priv = GAFLIGHT_SERVER_GET_PRIVATE(server);
723 return &(priv->server);
724}