]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/c_glib/arrow-glib/output-stream.cpp
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / c_glib / arrow-glib / output-stream.cpp
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <arrow/io/memory.h>
21#include <arrow/ipc/writer.h>
22
23#include <arrow-glib/buffer.hpp>
24#include <arrow-glib/codec.hpp>
25#include <arrow-glib/error.hpp>
26#include <arrow-glib/file.hpp>
27#include <arrow-glib/ipc-options.hpp>
28#include <arrow-glib/output-stream.hpp>
29#include <arrow-glib/record-batch.hpp>
30#include <arrow-glib/tensor.hpp>
31#include <arrow-glib/writable.hpp>
32
33#include <iostream>
34#include <sstream>
35
36G_BEGIN_DECLS
37
38/**
39 * SECTION: output-stream
40 * @section_id: output-stream-classes
41 * @title: Output stream classes
42 * @include: arrow-glib/arrow-glib.h
43 *
44 * #GArrowOutputStream is an interface for stream output. Stream
45 * output is file based and writable
46 *
47 * #GArrowFileOutputStream is a class for file output stream.
48 *
49 * #GArrowBufferOutputStream is a class for buffer output stream.
50 *
51 * #GArrowGIOOutputStream is a class for `GOutputStream` based output
52 * stream.
53 *
54 * #GArrowCompressedOutputStream is a class to write compressed data to
55 * output stream.
56 */
57
58typedef struct GArrowOutputStreamPrivate_ {
59 std::shared_ptr<arrow::io::OutputStream> output_stream;
60} GArrowOutputStreamPrivate;
61
62enum {
63 PROP_0,
64 PROP_OUTPUT_STREAM
65};
66
67static std::shared_ptr<arrow::io::FileInterface>
68garrow_output_stream_get_raw_file_interface(GArrowFile *file)
69{
70 auto output_stream = GARROW_OUTPUT_STREAM(file);
71 auto arrow_output_stream = garrow_output_stream_get_raw(output_stream);
72 return arrow_output_stream;
73}
74
75static void
76garrow_output_stream_file_interface_init(GArrowFileInterface *iface)
77{
78 iface->get_raw = garrow_output_stream_get_raw_file_interface;
79}
80
81static std::shared_ptr<arrow::io::Writable>
82garrow_output_stream_get_raw_writable_interface(GArrowWritable *writable)
83{
84 auto output_stream = GARROW_OUTPUT_STREAM(writable);
85 auto arrow_output_stream = garrow_output_stream_get_raw(output_stream);
86 return arrow_output_stream;
87}
88
89static void
90garrow_output_stream_writable_interface_init(GArrowWritableInterface *iface)
91{
92 iface->get_raw = garrow_output_stream_get_raw_writable_interface;
93}
94
95G_DEFINE_TYPE_WITH_CODE(GArrowOutputStream,
96 garrow_output_stream,
97 G_TYPE_OBJECT,
98 G_ADD_PRIVATE(GArrowOutputStream)
99 G_IMPLEMENT_INTERFACE(GARROW_TYPE_FILE,
100 garrow_output_stream_file_interface_init)
101 G_IMPLEMENT_INTERFACE(GARROW_TYPE_WRITABLE,
102 garrow_output_stream_writable_interface_init));
103
104#define GARROW_OUTPUT_STREAM_GET_PRIVATE(obj) \
105 static_cast<GArrowOutputStreamPrivate *>( \
106 garrow_output_stream_get_instance_private( \
107 GARROW_OUTPUT_STREAM(obj)))
108
109static void
110garrow_output_stream_finalize(GObject *object)
111{
112 auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object);
113
114 priv->output_stream.~shared_ptr();
115
116 G_OBJECT_CLASS(garrow_output_stream_parent_class)->finalize(object);
117}
118
119static void
120garrow_output_stream_set_property(GObject *object,
121 guint prop_id,
122 const GValue *value,
123 GParamSpec *pspec)
124{
125 auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object);
126
127 switch (prop_id) {
128 case PROP_OUTPUT_STREAM:
129 priv->output_stream =
130 *static_cast<std::shared_ptr<arrow::io::OutputStream> *>(g_value_get_pointer(value));
131 break;
132 default:
133 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
134 break;
135 }
136}
137
138static void
139garrow_output_stream_get_property(GObject *object,
140 guint prop_id,
141 GValue *value,
142 GParamSpec *pspec)
143{
144 switch (prop_id) {
145 default:
146 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
147 break;
148 }
149}
150
151static void
152garrow_output_stream_init(GArrowOutputStream *object)
153{
154 auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object);
155 new(&priv->output_stream) std::shared_ptr<arrow::io::OutputStream>;
156}
157
158static void
159garrow_output_stream_class_init(GArrowOutputStreamClass *klass)
160{
161 auto gobject_class = G_OBJECT_CLASS(klass);
162
163 gobject_class->finalize = garrow_output_stream_finalize;
164 gobject_class->set_property = garrow_output_stream_set_property;
165 gobject_class->get_property = garrow_output_stream_get_property;
166
167 GParamSpec *spec;
168 spec = g_param_spec_pointer("output-stream",
169 "io::OutputStream",
170 "The raw std::shared<arrow::io::OutputStream> *",
171 static_cast<GParamFlags>(G_PARAM_WRITABLE |
172 G_PARAM_CONSTRUCT_ONLY));
173 g_object_class_install_property(gobject_class, PROP_OUTPUT_STREAM, spec);
174}
175
176/**
177 * garrow_output_stream_align:
178 * @stream: A #GArrowOutputStream.
179 * @alignment: The byte multiple for the metadata prefix, usually 8
180 * or 64, to ensure the body starts on a multiple of that alignment.
181 * @error: (nullable): Return location for a #GError or %NULL.
182 *
183 * Returns: %TRUE on success, %FALSE on error.
184 *
185 * Since: 0.11.0
186 */
187gboolean
188garrow_output_stream_align(GArrowOutputStream *stream,
189 gint32 alignment,
190 GError **error)
191{
192 auto arrow_stream = garrow_output_stream_get_raw(stream);
193 auto status = arrow::ipc::AlignStream(arrow_stream.get(), alignment);
194 return garrow::check(error, status, "[output-stream][align]");
195}
196
197/**
198 * garrow_output_stream_write_tensor:
199 * @stream: A #GArrowOutputStream.
200 * @tensor: A #GArrowTensor to be written.
201 * @error: (nullable): Return location for a #GError or %NULL.
202 *
203 * Returns: The number of written bytes on success, -1 on error.
204 *
205 * Since: 0.4.0
206 */
207gint64
208garrow_output_stream_write_tensor(GArrowOutputStream *stream,
209 GArrowTensor *tensor,
210 GError **error)
211{
212 auto arrow_stream = garrow_output_stream_get_raw(stream);
213 auto arrow_tensor = garrow_tensor_get_raw(tensor);
214 int32_t metadata_length;
215 int64_t body_length;
216 auto status = arrow::ipc::WriteTensor(*arrow_tensor,
217 arrow_stream.get(),
218 &metadata_length,
219 &body_length);
220 if (garrow::check(error, status, "[output-stream][write-tensor]")) {
221 return metadata_length + body_length;
222 } else {
223 return -1;
224 }
225}
226
227/**
228 * garrow_output_stream_write_record_batch:
229 * @stream: A #GArrowOutputStream.
230 * @record_batch: A #GArrowRecordBatch to be written.
231 * @options: (nullable): A #GArrowWriteOptions.
232 * @error: (nullable): Return location for a #GError or %NULL.
233 *
234 * Returns: The number of written bytes on success, -1 on error.
235 *
236 * Since: 1.0.0
237 */
238gint64
239garrow_output_stream_write_record_batch(GArrowOutputStream *stream,
240 GArrowRecordBatch *record_batch,
241 GArrowWriteOptions *options,
242 GError **error)
243{
244 auto arrow_stream = garrow_output_stream_get_raw(stream);
245 auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
246 int64_t buffer_start_offset = 0;
247 int32_t metadata_length;
248 int64_t body_length;
249 arrow::Status status;
250 if (options) {
251 auto arrow_options = garrow_write_options_get_raw(options);
252 status = arrow::ipc::WriteRecordBatch(*arrow_record_batch,
253 buffer_start_offset,
254 arrow_stream.get(),
255 &metadata_length,
256 &body_length,
257 *arrow_options);
258 } else {
259 auto arrow_options = arrow::ipc::IpcWriteOptions::Defaults();
260 status = arrow::ipc::WriteRecordBatch(*arrow_record_batch,
261 buffer_start_offset,
262 arrow_stream.get(),
263 &metadata_length,
264 &body_length,
265 arrow_options);
266 }
267 if (garrow::check(error, status, "[output-stream][write-record-batch]")) {
268 return metadata_length + body_length;
269 } else {
270 return -1;
271 }
272}
273
274
275G_DEFINE_TYPE(GArrowFileOutputStream,
276 garrow_file_output_stream,
277 GARROW_TYPE_OUTPUT_STREAM);
278
279static void
280garrow_file_output_stream_init(GArrowFileOutputStream *file_output_stream)
281{
282}
283
284static void
285garrow_file_output_stream_class_init(GArrowFileOutputStreamClass *klass)
286{
287}
288
289/**
290 * garrow_file_output_stream_new:
291 * @path: The path of the file output stream.
292 * @append: Whether the path is opened as append mode or recreate mode.
293 * @error: (nullable): Return location for a #GError or %NULL.
294 *
295 * Returns: (nullable): A newly opened #GArrowFileOutputStream or
296 * %NULL on error.
297 */
298GArrowFileOutputStream *
299garrow_file_output_stream_new(const gchar *path,
300 gboolean append,
301 GError **error)
302{
303 auto arrow_file_output_stream_result =
304 arrow::io::FileOutputStream::Open(std::string(path), append);
305 if (arrow_file_output_stream_result.ok()) {
306 auto arrow_file_output_stream =
307 arrow_file_output_stream_result.ValueOrDie();
308 return garrow_file_output_stream_new_raw(&arrow_file_output_stream);
309 } else {
310 std::string context("[io][file-output-stream][open]: <");
311 context += path;
312 context += ">";
313 garrow::check(error, arrow_file_output_stream_result, context.c_str());
314 return NULL;
315 }
316}
317
318
319G_DEFINE_TYPE(GArrowBufferOutputStream,
320 garrow_buffer_output_stream,
321 GARROW_TYPE_OUTPUT_STREAM);
322
323static void
324garrow_buffer_output_stream_init(GArrowBufferOutputStream *buffer_output_stream)
325{
326}
327
328static void
329garrow_buffer_output_stream_class_init(GArrowBufferOutputStreamClass *klass)
330{
331}
332
333/**
334 * garrow_buffer_output_stream_new:
335 * @buffer: The resizable buffer to be output.
336 *
337 * Returns: (transfer full): A newly created #GArrowBufferOutputStream.
338 */
339GArrowBufferOutputStream *
340garrow_buffer_output_stream_new(GArrowResizableBuffer *buffer)
341{
342 auto arrow_buffer = garrow_buffer_get_raw(GARROW_BUFFER(buffer));
343 auto arrow_resizable_buffer =
344 std::static_pointer_cast<arrow::ResizableBuffer>(arrow_buffer);
345 auto arrow_buffer_output_stream =
346 std::make_shared<arrow::io::BufferOutputStream>(arrow_resizable_buffer);
347 return garrow_buffer_output_stream_new_raw(&arrow_buffer_output_stream);
348}
349
350G_END_DECLS
351
352
353namespace garrow {
354 class GIOOutputStream : public arrow::io::OutputStream {
355 public:
356 GIOOutputStream(GOutputStream *output_stream) :
357 output_stream_(output_stream),
358 position_(0) {
359 g_object_ref(output_stream_);
360 }
361
362 ~GIOOutputStream() {
363 g_object_unref(output_stream_);
364 }
365
366 GOutputStream *get_output_stream() {
367 return output_stream_;
368 }
369
370 bool closed() const override {
371 return static_cast<bool>(g_output_stream_is_closed(output_stream_));
372 }
373
374 arrow::Status Close() override {
375 GError *error = NULL;
376 if (g_output_stream_close(output_stream_, NULL, &error)) {
377 return arrow::Status::OK();
378 } else {
379 return garrow_error_to_status(error,
380 arrow::StatusCode::IOError,
381 "[gio-output-stream][close]");
382 }
383 }
384
385 arrow::Result<int64_t> Tell() const override {
386 if (G_IS_SEEKABLE(output_stream_)) {
387 return g_seekable_tell(G_SEEKABLE(output_stream_));
388 } else {
389 return position_;
390 }
391 }
392
393 arrow::Status Write(const void *data,
394 int64_t n_bytes) override {
395 GError *error = NULL;
396 gsize n_written_bytes;
397 auto successed = g_output_stream_write_all(output_stream_,
398 data,
399 n_bytes,
400 &n_written_bytes,
401 NULL,
402 &error);
403 if (successed) {
404 position_ += n_written_bytes;
405 return arrow::Status::OK();
406 } else {
407 std::stringstream message("[gio-output-stream][write]");
408 message << "[" << n_written_bytes << "/" << n_bytes << "]";
409 return garrow_error_to_status(error,
410 arrow::StatusCode::IOError,
411 message.str().c_str());
412 }
413 }
414
415 arrow::Status Flush() override {
416 GError *error = NULL;
417 auto successed = g_output_stream_flush(output_stream_, NULL, &error);
418 if (successed) {
419 return arrow::Status::OK();
420 } else {
421 return garrow_error_to_status(error,
422 arrow::StatusCode::IOError,
423 "[gio-output-stream][flush]");
424 }
425 }
426
427 private:
428 GOutputStream *output_stream_;
429 int64_t position_;
430 };
431};
432
433G_BEGIN_DECLS
434
435typedef struct GArrowGIOOutputStreamPrivate_ {
436 GOutputStream *raw;
437} GArrowGIOOutputStreamPrivate;
438
439enum {
440 PROP_GIO_RAW = 1
441};
442
443G_DEFINE_TYPE_WITH_PRIVATE(GArrowGIOOutputStream,
444 garrow_gio_output_stream,
445 GARROW_TYPE_OUTPUT_STREAM);
446
447#define GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object) \
448 static_cast<GArrowGIOOutputStreamPrivate *>( \
449 garrow_gio_output_stream_get_instance_private( \
450 GARROW_GIO_OUTPUT_STREAM(object)))
451
452static void
453garrow_gio_output_stream_dispose(GObject *object)
454{
455 auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object);
456
457 if (priv->raw) {
458 g_object_unref(priv->raw);
459 priv->raw = nullptr;
460 }
461
462 G_OBJECT_CLASS(garrow_gio_output_stream_parent_class)->dispose(object);
463}
464
465static void
466garrow_gio_output_stream_set_property(GObject *object,
467 guint prop_id,
468 const GValue *value,
469 GParamSpec *pspec)
470{
471 auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object);
472
473 switch (prop_id) {
474 case PROP_GIO_RAW:
475 priv->raw = G_OUTPUT_STREAM(g_value_dup_object(value));
476 break;
477 default:
478 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
479 break;
480 }
481}
482
483static void
484garrow_gio_output_stream_get_property(GObject *object,
485 guint prop_id,
486 GValue *value,
487 GParamSpec *pspec)
488{
489 auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object);
490
491 switch (prop_id) {
492 case PROP_GIO_RAW:
493 g_value_set_object(value, priv->raw);
494 break;
495 default:
496 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
497 break;
498 }
499}
500
501static void
502garrow_gio_output_stream_init(GArrowGIOOutputStream *object)
503{
504}
505
506static void
507garrow_gio_output_stream_class_init(GArrowGIOOutputStreamClass *klass)
508{
509 auto gobject_class = G_OBJECT_CLASS(klass);
510
511 gobject_class->dispose = garrow_gio_output_stream_dispose;
512 gobject_class->set_property = garrow_gio_output_stream_set_property;
513 gobject_class->get_property = garrow_gio_output_stream_get_property;
514
515 GParamSpec *spec;
516 spec = g_param_spec_object("raw",
517 "Raw",
518 "The raw GOutputStream *",
519 G_TYPE_OUTPUT_STREAM,
520 static_cast<GParamFlags>(G_PARAM_READWRITE |
521 G_PARAM_CONSTRUCT_ONLY));
522 g_object_class_install_property(gobject_class, PROP_GIO_RAW, spec);
523}
524
525/**
526 * garrow_gio_output_stream_new:
527 * @gio_output_stream: The stream to be output.
528 *
529 * Returns: (transfer full): A newly created #GArrowGIOOutputStream.
530 */
531GArrowGIOOutputStream *
532garrow_gio_output_stream_new(GOutputStream *gio_output_stream)
533{
534 auto arrow_output_stream =
535 std::make_shared<garrow::GIOOutputStream>(gio_output_stream);
536 auto object = g_object_new(GARROW_TYPE_GIO_OUTPUT_STREAM,
537 "output-stream", &arrow_output_stream,
538 "raw", gio_output_stream,
539 NULL);
540 auto output_stream = GARROW_GIO_OUTPUT_STREAM(object);
541 return output_stream;
542}
543
544/**
545 * garrow_gio_output_stream_get_raw:
546 * @output_stream: A #GArrowGIOOutputStream.
547 *
548 * Returns: (transfer none): The wrapped #GOutputStream.
549 *
550 * Since: 0.5.0
551 *
552 * Deprecated: 0.12.0: Use GArrowGIOOutputStream::raw property instead.
553 */
554GOutputStream *
555garrow_gio_output_stream_get_raw(GArrowGIOOutputStream *output_stream)
556{
557 auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(output_stream);
558 return priv->raw;
559}
560
561typedef struct GArrowCompressedOutputStreamPrivate_ {
562 GArrowCodec *codec;
563 GArrowOutputStream *raw;
564} GArrowCompressedOutputStreamPrivate;
565
566enum {
567 PROP_CODEC = 1,
568 PROP_RAW
569};
570
571G_DEFINE_TYPE_WITH_PRIVATE(GArrowCompressedOutputStream,
572 garrow_compressed_output_stream,
573 GARROW_TYPE_OUTPUT_STREAM)
574
575#define GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object) \
576 static_cast<GArrowCompressedOutputStreamPrivate *>( \
577 garrow_compressed_output_stream_get_instance_private( \
578 GARROW_COMPRESSED_OUTPUT_STREAM(object)))
579
580static void
581garrow_compressed_output_stream_dispose(GObject *object)
582{
583 auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object);
584
585 if (priv->codec) {
586 g_object_unref(priv->codec);
587 priv->codec = NULL;
588 }
589
590 if (priv->raw) {
591 g_object_unref(priv->raw);
592 priv->raw = NULL;
593 }
594
595 G_OBJECT_CLASS(garrow_compressed_output_stream_parent_class)->dispose(object);
596}
597
598static void
599garrow_compressed_output_stream_set_property(GObject *object,
600 guint prop_id,
601 const GValue *value,
602 GParamSpec *pspec)
603{
604 auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object);
605
606 switch (prop_id) {
607 case PROP_CODEC:
608 priv->codec = GARROW_CODEC(g_value_dup_object(value));
609 break;
610 case PROP_RAW:
611 priv->raw = GARROW_OUTPUT_STREAM(g_value_dup_object(value));
612 break;
613 default:
614 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
615 break;
616 }
617}
618
619static void
620garrow_compressed_output_stream_get_property(GObject *object,
621 guint prop_id,
622 GValue *value,
623 GParamSpec *pspec)
624{
625 auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object);
626
627 switch (prop_id) {
628 case PROP_CODEC:
629 g_value_set_object(value, priv->codec);
630 break;
631 case PROP_RAW:
632 g_value_set_object(value, priv->raw);
633 break;
634 default:
635 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
636 break;
637 }
638}
639
640static void
641garrow_compressed_output_stream_init(GArrowCompressedOutputStream *object)
642{
643}
644
645static void
646garrow_compressed_output_stream_class_init(GArrowCompressedOutputStreamClass *klass)
647{
648 auto gobject_class = G_OBJECT_CLASS(klass);
649
650 gobject_class->dispose = garrow_compressed_output_stream_dispose;
651 gobject_class->set_property = garrow_compressed_output_stream_set_property;
652 gobject_class->get_property = garrow_compressed_output_stream_get_property;
653
654 GParamSpec *spec;
655 spec = g_param_spec_object("codec",
656 "Codec",
657 "The codec for the stream",
658 GARROW_TYPE_CODEC,
659 static_cast<GParamFlags>(G_PARAM_READWRITE |
660 G_PARAM_CONSTRUCT_ONLY));
661 g_object_class_install_property(gobject_class, PROP_CODEC, spec);
662
663 spec = g_param_spec_object("raw",
664 "Raw",
665 "The underlying raw output stream",
666 GARROW_TYPE_OUTPUT_STREAM,
667 static_cast<GParamFlags>(G_PARAM_READWRITE |
668 G_PARAM_CONSTRUCT_ONLY));
669 g_object_class_install_property(gobject_class, PROP_RAW, spec);
670}
671
672/**
673 * garrow_compressed_output_stream_new:
674 * @codec: A #GArrowCodec for compressed data in the @raw.
675 * @raw: A #GArrowOutputStream that is a sink for compressed data.
676 * @error: (nullable): Return location for a #GError or %NULL.
677 *
678 * Returns: A newly created #GArrowCompressedOutputStream.
679 *
680 * Since: 0.12.0
681 */
682GArrowCompressedOutputStream *
683garrow_compressed_output_stream_new(GArrowCodec *codec,
684 GArrowOutputStream *raw,
685 GError **error)
686{
687 auto arrow_codec = garrow_codec_get_raw(codec).get();
688 auto arrow_raw = garrow_output_stream_get_raw(raw);
689 auto arrow_stream = arrow::io::CompressedOutputStream::Make(arrow_codec,
690 arrow_raw);
691 if (garrow::check(error, arrow_stream, "[compressed-output-stream][new]")) {
692 return garrow_compressed_output_stream_new_raw(&(arrow_stream.ValueOrDie()),
693 codec,
694 raw);
695 } else {
696 return NULL;
697 }
698}
699
700G_END_DECLS
701
702
703GArrowOutputStream *
704garrow_output_stream_new_raw(std::shared_ptr<arrow::io::OutputStream> *arrow_output_stream)
705{
706 auto output_stream =
707 GARROW_OUTPUT_STREAM(g_object_new(GARROW_TYPE_OUTPUT_STREAM,
708 "output-stream", arrow_output_stream,
709 NULL));
710 return output_stream;
711}
712
713std::shared_ptr<arrow::io::OutputStream>
714garrow_output_stream_get_raw(GArrowOutputStream *output_stream)
715{
716 auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(output_stream);
717 return priv->output_stream;
718}
719
720
721GArrowFileOutputStream *
722garrow_file_output_stream_new_raw(std::shared_ptr<arrow::io::FileOutputStream> *arrow_file_output_stream)
723{
724 auto file_output_stream =
725 GARROW_FILE_OUTPUT_STREAM(g_object_new(GARROW_TYPE_FILE_OUTPUT_STREAM,
726 "output-stream", arrow_file_output_stream,
727 NULL));
728 return file_output_stream;
729}
730
731GArrowBufferOutputStream *
732garrow_buffer_output_stream_new_raw(std::shared_ptr<arrow::io::BufferOutputStream> *arrow_buffer_output_stream)
733{
734 auto buffer_output_stream =
735 GARROW_BUFFER_OUTPUT_STREAM(g_object_new(GARROW_TYPE_BUFFER_OUTPUT_STREAM,
736 "output-stream", arrow_buffer_output_stream,
737 NULL));
738 return buffer_output_stream;
739}
740
741GArrowCompressedOutputStream *
742garrow_compressed_output_stream_new_raw(std::shared_ptr<arrow::io::CompressedOutputStream> *arrow_raw,
743 GArrowCodec *codec,
744 GArrowOutputStream *raw)
745{
746 auto compressed_output_stream =
747 g_object_new(GARROW_TYPE_COMPRESSED_OUTPUT_STREAM,
748 "output-stream", arrow_raw,
749 "codec", codec,
750 "raw", raw,
751 NULL);
752 return GARROW_COMPRESSED_OUTPUT_STREAM(compressed_output_stream);
753}
754
755std::shared_ptr<arrow::io::OutputStream>
756garrow_compressed_output_stream_get_raw(GArrowCompressedOutputStream *compressed_output_stream)
757{
758 auto output_stream = GARROW_OUTPUT_STREAM(compressed_output_stream);
759 auto arrow_output_stream = garrow_output_stream_get_raw(output_stream);
760 auto arrow_compressed_output_stream =
761 std::static_pointer_cast<arrow::io::CompressedOutputStream>(arrow_output_stream);
762 return arrow_compressed_output_stream->raw();
763}