]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | #include <arrow/io/memory.h> | |
21 | #include <arrow/ipc/writer.h> | |
22 | ||
23 | #include <arrow-glib/buffer.hpp> | |
24 | #include <arrow-glib/codec.hpp> | |
25 | #include <arrow-glib/error.hpp> | |
26 | #include <arrow-glib/file.hpp> | |
27 | #include <arrow-glib/ipc-options.hpp> | |
28 | #include <arrow-glib/output-stream.hpp> | |
29 | #include <arrow-glib/record-batch.hpp> | |
30 | #include <arrow-glib/tensor.hpp> | |
31 | #include <arrow-glib/writable.hpp> | |
32 | ||
33 | #include <iostream> | |
34 | #include <sstream> | |
35 | ||
36 | G_BEGIN_DECLS | |
37 | ||
38 | /** | |
39 | * SECTION: output-stream | |
40 | * @section_id: output-stream-classes | |
41 | * @title: Output stream classes | |
42 | * @include: arrow-glib/arrow-glib.h | |
43 | * | |
44 | * #GArrowOutputStream is an interface for stream output. Stream | |
45 | * output is file based and writable | |
46 | * | |
47 | * #GArrowFileOutputStream is a class for file output stream. | |
48 | * | |
49 | * #GArrowBufferOutputStream is a class for buffer output stream. | |
50 | * | |
51 | * #GArrowGIOOutputStream is a class for `GOutputStream` based output | |
52 | * stream. | |
53 | * | |
54 | * #GArrowCompressedOutputStream is a class to write compressed data to | |
55 | * output stream. | |
56 | */ | |
57 | ||
58 | typedef struct GArrowOutputStreamPrivate_ { | |
59 | std::shared_ptr<arrow::io::OutputStream> output_stream; | |
60 | } GArrowOutputStreamPrivate; | |
61 | ||
62 | enum { | |
63 | PROP_0, | |
64 | PROP_OUTPUT_STREAM | |
65 | }; | |
66 | ||
67 | static std::shared_ptr<arrow::io::FileInterface> | |
68 | garrow_output_stream_get_raw_file_interface(GArrowFile *file) | |
69 | { | |
70 | auto output_stream = GARROW_OUTPUT_STREAM(file); | |
71 | auto arrow_output_stream = garrow_output_stream_get_raw(output_stream); | |
72 | return arrow_output_stream; | |
73 | } | |
74 | ||
75 | static void | |
76 | garrow_output_stream_file_interface_init(GArrowFileInterface *iface) | |
77 | { | |
78 | iface->get_raw = garrow_output_stream_get_raw_file_interface; | |
79 | } | |
80 | ||
81 | static std::shared_ptr<arrow::io::Writable> | |
82 | garrow_output_stream_get_raw_writable_interface(GArrowWritable *writable) | |
83 | { | |
84 | auto output_stream = GARROW_OUTPUT_STREAM(writable); | |
85 | auto arrow_output_stream = garrow_output_stream_get_raw(output_stream); | |
86 | return arrow_output_stream; | |
87 | } | |
88 | ||
89 | static void | |
90 | garrow_output_stream_writable_interface_init(GArrowWritableInterface *iface) | |
91 | { | |
92 | iface->get_raw = garrow_output_stream_get_raw_writable_interface; | |
93 | } | |
94 | ||
95 | G_DEFINE_TYPE_WITH_CODE(GArrowOutputStream, | |
96 | garrow_output_stream, | |
97 | G_TYPE_OBJECT, | |
98 | G_ADD_PRIVATE(GArrowOutputStream) | |
99 | G_IMPLEMENT_INTERFACE(GARROW_TYPE_FILE, | |
100 | garrow_output_stream_file_interface_init) | |
101 | G_IMPLEMENT_INTERFACE(GARROW_TYPE_WRITABLE, | |
102 | garrow_output_stream_writable_interface_init)); | |
103 | ||
104 | #define GARROW_OUTPUT_STREAM_GET_PRIVATE(obj) \ | |
105 | static_cast<GArrowOutputStreamPrivate *>( \ | |
106 | garrow_output_stream_get_instance_private( \ | |
107 | GARROW_OUTPUT_STREAM(obj))) | |
108 | ||
109 | static void | |
110 | garrow_output_stream_finalize(GObject *object) | |
111 | { | |
112 | auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object); | |
113 | ||
114 | priv->output_stream.~shared_ptr(); | |
115 | ||
116 | G_OBJECT_CLASS(garrow_output_stream_parent_class)->finalize(object); | |
117 | } | |
118 | ||
119 | static void | |
120 | garrow_output_stream_set_property(GObject *object, | |
121 | guint prop_id, | |
122 | const GValue *value, | |
123 | GParamSpec *pspec) | |
124 | { | |
125 | auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object); | |
126 | ||
127 | switch (prop_id) { | |
128 | case PROP_OUTPUT_STREAM: | |
129 | priv->output_stream = | |
130 | *static_cast<std::shared_ptr<arrow::io::OutputStream> *>(g_value_get_pointer(value)); | |
131 | break; | |
132 | default: | |
133 | G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); | |
134 | break; | |
135 | } | |
136 | } | |
137 | ||
138 | static void | |
139 | garrow_output_stream_get_property(GObject *object, | |
140 | guint prop_id, | |
141 | GValue *value, | |
142 | GParamSpec *pspec) | |
143 | { | |
144 | switch (prop_id) { | |
145 | default: | |
146 | G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); | |
147 | break; | |
148 | } | |
149 | } | |
150 | ||
151 | static void | |
152 | garrow_output_stream_init(GArrowOutputStream *object) | |
153 | { | |
154 | auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object); | |
155 | new(&priv->output_stream) std::shared_ptr<arrow::io::OutputStream>; | |
156 | } | |
157 | ||
158 | static void | |
159 | garrow_output_stream_class_init(GArrowOutputStreamClass *klass) | |
160 | { | |
161 | auto gobject_class = G_OBJECT_CLASS(klass); | |
162 | ||
163 | gobject_class->finalize = garrow_output_stream_finalize; | |
164 | gobject_class->set_property = garrow_output_stream_set_property; | |
165 | gobject_class->get_property = garrow_output_stream_get_property; | |
166 | ||
167 | GParamSpec *spec; | |
168 | spec = g_param_spec_pointer("output-stream", | |
169 | "io::OutputStream", | |
170 | "The raw std::shared<arrow::io::OutputStream> *", | |
171 | static_cast<GParamFlags>(G_PARAM_WRITABLE | | |
172 | G_PARAM_CONSTRUCT_ONLY)); | |
173 | g_object_class_install_property(gobject_class, PROP_OUTPUT_STREAM, spec); | |
174 | } | |
175 | ||
176 | /** | |
177 | * garrow_output_stream_align: | |
178 | * @stream: A #GArrowOutputStream. | |
179 | * @alignment: The byte multiple for the metadata prefix, usually 8 | |
180 | * or 64, to ensure the body starts on a multiple of that alignment. | |
181 | * @error: (nullable): Return location for a #GError or %NULL. | |
182 | * | |
183 | * Returns: %TRUE on success, %FALSE on error. | |
184 | * | |
185 | * Since: 0.11.0 | |
186 | */ | |
187 | gboolean | |
188 | garrow_output_stream_align(GArrowOutputStream *stream, | |
189 | gint32 alignment, | |
190 | GError **error) | |
191 | { | |
192 | auto arrow_stream = garrow_output_stream_get_raw(stream); | |
193 | auto status = arrow::ipc::AlignStream(arrow_stream.get(), alignment); | |
194 | return garrow::check(error, status, "[output-stream][align]"); | |
195 | } | |
196 | ||
197 | /** | |
198 | * garrow_output_stream_write_tensor: | |
199 | * @stream: A #GArrowOutputStream. | |
200 | * @tensor: A #GArrowTensor to be written. | |
201 | * @error: (nullable): Return location for a #GError or %NULL. | |
202 | * | |
203 | * Returns: The number of written bytes on success, -1 on error. | |
204 | * | |
205 | * Since: 0.4.0 | |
206 | */ | |
207 | gint64 | |
208 | garrow_output_stream_write_tensor(GArrowOutputStream *stream, | |
209 | GArrowTensor *tensor, | |
210 | GError **error) | |
211 | { | |
212 | auto arrow_stream = garrow_output_stream_get_raw(stream); | |
213 | auto arrow_tensor = garrow_tensor_get_raw(tensor); | |
214 | int32_t metadata_length; | |
215 | int64_t body_length; | |
216 | auto status = arrow::ipc::WriteTensor(*arrow_tensor, | |
217 | arrow_stream.get(), | |
218 | &metadata_length, | |
219 | &body_length); | |
220 | if (garrow::check(error, status, "[output-stream][write-tensor]")) { | |
221 | return metadata_length + body_length; | |
222 | } else { | |
223 | return -1; | |
224 | } | |
225 | } | |
226 | ||
227 | /** | |
228 | * garrow_output_stream_write_record_batch: | |
229 | * @stream: A #GArrowOutputStream. | |
230 | * @record_batch: A #GArrowRecordBatch to be written. | |
231 | * @options: (nullable): A #GArrowWriteOptions. | |
232 | * @error: (nullable): Return location for a #GError or %NULL. | |
233 | * | |
234 | * Returns: The number of written bytes on success, -1 on error. | |
235 | * | |
236 | * Since: 1.0.0 | |
237 | */ | |
238 | gint64 | |
239 | garrow_output_stream_write_record_batch(GArrowOutputStream *stream, | |
240 | GArrowRecordBatch *record_batch, | |
241 | GArrowWriteOptions *options, | |
242 | GError **error) | |
243 | { | |
244 | auto arrow_stream = garrow_output_stream_get_raw(stream); | |
245 | auto arrow_record_batch = garrow_record_batch_get_raw(record_batch); | |
246 | int64_t buffer_start_offset = 0; | |
247 | int32_t metadata_length; | |
248 | int64_t body_length; | |
249 | arrow::Status status; | |
250 | if (options) { | |
251 | auto arrow_options = garrow_write_options_get_raw(options); | |
252 | status = arrow::ipc::WriteRecordBatch(*arrow_record_batch, | |
253 | buffer_start_offset, | |
254 | arrow_stream.get(), | |
255 | &metadata_length, | |
256 | &body_length, | |
257 | *arrow_options); | |
258 | } else { | |
259 | auto arrow_options = arrow::ipc::IpcWriteOptions::Defaults(); | |
260 | status = arrow::ipc::WriteRecordBatch(*arrow_record_batch, | |
261 | buffer_start_offset, | |
262 | arrow_stream.get(), | |
263 | &metadata_length, | |
264 | &body_length, | |
265 | arrow_options); | |
266 | } | |
267 | if (garrow::check(error, status, "[output-stream][write-record-batch]")) { | |
268 | return metadata_length + body_length; | |
269 | } else { | |
270 | return -1; | |
271 | } | |
272 | } | |
273 | ||
274 | ||
275 | G_DEFINE_TYPE(GArrowFileOutputStream, | |
276 | garrow_file_output_stream, | |
277 | GARROW_TYPE_OUTPUT_STREAM); | |
278 | ||
279 | static void | |
280 | garrow_file_output_stream_init(GArrowFileOutputStream *file_output_stream) | |
281 | { | |
282 | } | |
283 | ||
284 | static void | |
285 | garrow_file_output_stream_class_init(GArrowFileOutputStreamClass *klass) | |
286 | { | |
287 | } | |
288 | ||
289 | /** | |
290 | * garrow_file_output_stream_new: | |
291 | * @path: The path of the file output stream. | |
292 | * @append: Whether the path is opened as append mode or recreate mode. | |
293 | * @error: (nullable): Return location for a #GError or %NULL. | |
294 | * | |
295 | * Returns: (nullable): A newly opened #GArrowFileOutputStream or | |
296 | * %NULL on error. | |
297 | */ | |
298 | GArrowFileOutputStream * | |
299 | garrow_file_output_stream_new(const gchar *path, | |
300 | gboolean append, | |
301 | GError **error) | |
302 | { | |
303 | auto arrow_file_output_stream_result = | |
304 | arrow::io::FileOutputStream::Open(std::string(path), append); | |
305 | if (arrow_file_output_stream_result.ok()) { | |
306 | auto arrow_file_output_stream = | |
307 | arrow_file_output_stream_result.ValueOrDie(); | |
308 | return garrow_file_output_stream_new_raw(&arrow_file_output_stream); | |
309 | } else { | |
310 | std::string context("[io][file-output-stream][open]: <"); | |
311 | context += path; | |
312 | context += ">"; | |
313 | garrow::check(error, arrow_file_output_stream_result, context.c_str()); | |
314 | return NULL; | |
315 | } | |
316 | } | |
317 | ||
318 | ||
319 | G_DEFINE_TYPE(GArrowBufferOutputStream, | |
320 | garrow_buffer_output_stream, | |
321 | GARROW_TYPE_OUTPUT_STREAM); | |
322 | ||
323 | static void | |
324 | garrow_buffer_output_stream_init(GArrowBufferOutputStream *buffer_output_stream) | |
325 | { | |
326 | } | |
327 | ||
328 | static void | |
329 | garrow_buffer_output_stream_class_init(GArrowBufferOutputStreamClass *klass) | |
330 | { | |
331 | } | |
332 | ||
333 | /** | |
334 | * garrow_buffer_output_stream_new: | |
335 | * @buffer: The resizable buffer to be output. | |
336 | * | |
337 | * Returns: (transfer full): A newly created #GArrowBufferOutputStream. | |
338 | */ | |
339 | GArrowBufferOutputStream * | |
340 | garrow_buffer_output_stream_new(GArrowResizableBuffer *buffer) | |
341 | { | |
342 | auto arrow_buffer = garrow_buffer_get_raw(GARROW_BUFFER(buffer)); | |
343 | auto arrow_resizable_buffer = | |
344 | std::static_pointer_cast<arrow::ResizableBuffer>(arrow_buffer); | |
345 | auto arrow_buffer_output_stream = | |
346 | std::make_shared<arrow::io::BufferOutputStream>(arrow_resizable_buffer); | |
347 | return garrow_buffer_output_stream_new_raw(&arrow_buffer_output_stream); | |
348 | } | |
349 | ||
350 | G_END_DECLS | |
351 | ||
352 | ||
353 | namespace garrow { | |
354 | class GIOOutputStream : public arrow::io::OutputStream { | |
355 | public: | |
356 | GIOOutputStream(GOutputStream *output_stream) : | |
357 | output_stream_(output_stream), | |
358 | position_(0) { | |
359 | g_object_ref(output_stream_); | |
360 | } | |
361 | ||
362 | ~GIOOutputStream() { | |
363 | g_object_unref(output_stream_); | |
364 | } | |
365 | ||
366 | GOutputStream *get_output_stream() { | |
367 | return output_stream_; | |
368 | } | |
369 | ||
370 | bool closed() const override { | |
371 | return static_cast<bool>(g_output_stream_is_closed(output_stream_)); | |
372 | } | |
373 | ||
374 | arrow::Status Close() override { | |
375 | GError *error = NULL; | |
376 | if (g_output_stream_close(output_stream_, NULL, &error)) { | |
377 | return arrow::Status::OK(); | |
378 | } else { | |
379 | return garrow_error_to_status(error, | |
380 | arrow::StatusCode::IOError, | |
381 | "[gio-output-stream][close]"); | |
382 | } | |
383 | } | |
384 | ||
385 | arrow::Result<int64_t> Tell() const override { | |
386 | if (G_IS_SEEKABLE(output_stream_)) { | |
387 | return g_seekable_tell(G_SEEKABLE(output_stream_)); | |
388 | } else { | |
389 | return position_; | |
390 | } | |
391 | } | |
392 | ||
393 | arrow::Status Write(const void *data, | |
394 | int64_t n_bytes) override { | |
395 | GError *error = NULL; | |
396 | gsize n_written_bytes; | |
397 | auto successed = g_output_stream_write_all(output_stream_, | |
398 | data, | |
399 | n_bytes, | |
400 | &n_written_bytes, | |
401 | NULL, | |
402 | &error); | |
403 | if (successed) { | |
404 | position_ += n_written_bytes; | |
405 | return arrow::Status::OK(); | |
406 | } else { | |
407 | std::stringstream message("[gio-output-stream][write]"); | |
408 | message << "[" << n_written_bytes << "/" << n_bytes << "]"; | |
409 | return garrow_error_to_status(error, | |
410 | arrow::StatusCode::IOError, | |
411 | message.str().c_str()); | |
412 | } | |
413 | } | |
414 | ||
415 | arrow::Status Flush() override { | |
416 | GError *error = NULL; | |
417 | auto successed = g_output_stream_flush(output_stream_, NULL, &error); | |
418 | if (successed) { | |
419 | return arrow::Status::OK(); | |
420 | } else { | |
421 | return garrow_error_to_status(error, | |
422 | arrow::StatusCode::IOError, | |
423 | "[gio-output-stream][flush]"); | |
424 | } | |
425 | } | |
426 | ||
427 | private: | |
428 | GOutputStream *output_stream_; | |
429 | int64_t position_; | |
430 | }; | |
431 | }; | |
432 | ||
433 | G_BEGIN_DECLS | |
434 | ||
435 | typedef struct GArrowGIOOutputStreamPrivate_ { | |
436 | GOutputStream *raw; | |
437 | } GArrowGIOOutputStreamPrivate; | |
438 | ||
439 | enum { | |
440 | PROP_GIO_RAW = 1 | |
441 | }; | |
442 | ||
443 | G_DEFINE_TYPE_WITH_PRIVATE(GArrowGIOOutputStream, | |
444 | garrow_gio_output_stream, | |
445 | GARROW_TYPE_OUTPUT_STREAM); | |
446 | ||
447 | #define GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object) \ | |
448 | static_cast<GArrowGIOOutputStreamPrivate *>( \ | |
449 | garrow_gio_output_stream_get_instance_private( \ | |
450 | GARROW_GIO_OUTPUT_STREAM(object))) | |
451 | ||
452 | static void | |
453 | garrow_gio_output_stream_dispose(GObject *object) | |
454 | { | |
455 | auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object); | |
456 | ||
457 | if (priv->raw) { | |
458 | g_object_unref(priv->raw); | |
459 | priv->raw = nullptr; | |
460 | } | |
461 | ||
462 | G_OBJECT_CLASS(garrow_gio_output_stream_parent_class)->dispose(object); | |
463 | } | |
464 | ||
465 | static void | |
466 | garrow_gio_output_stream_set_property(GObject *object, | |
467 | guint prop_id, | |
468 | const GValue *value, | |
469 | GParamSpec *pspec) | |
470 | { | |
471 | auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object); | |
472 | ||
473 | switch (prop_id) { | |
474 | case PROP_GIO_RAW: | |
475 | priv->raw = G_OUTPUT_STREAM(g_value_dup_object(value)); | |
476 | break; | |
477 | default: | |
478 | G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); | |
479 | break; | |
480 | } | |
481 | } | |
482 | ||
483 | static void | |
484 | garrow_gio_output_stream_get_property(GObject *object, | |
485 | guint prop_id, | |
486 | GValue *value, | |
487 | GParamSpec *pspec) | |
488 | { | |
489 | auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object); | |
490 | ||
491 | switch (prop_id) { | |
492 | case PROP_GIO_RAW: | |
493 | g_value_set_object(value, priv->raw); | |
494 | break; | |
495 | default: | |
496 | G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); | |
497 | break; | |
498 | } | |
499 | } | |
500 | ||
501 | static void | |
502 | garrow_gio_output_stream_init(GArrowGIOOutputStream *object) | |
503 | { | |
504 | } | |
505 | ||
506 | static void | |
507 | garrow_gio_output_stream_class_init(GArrowGIOOutputStreamClass *klass) | |
508 | { | |
509 | auto gobject_class = G_OBJECT_CLASS(klass); | |
510 | ||
511 | gobject_class->dispose = garrow_gio_output_stream_dispose; | |
512 | gobject_class->set_property = garrow_gio_output_stream_set_property; | |
513 | gobject_class->get_property = garrow_gio_output_stream_get_property; | |
514 | ||
515 | GParamSpec *spec; | |
516 | spec = g_param_spec_object("raw", | |
517 | "Raw", | |
518 | "The raw GOutputStream *", | |
519 | G_TYPE_OUTPUT_STREAM, | |
520 | static_cast<GParamFlags>(G_PARAM_READWRITE | | |
521 | G_PARAM_CONSTRUCT_ONLY)); | |
522 | g_object_class_install_property(gobject_class, PROP_GIO_RAW, spec); | |
523 | } | |
524 | ||
525 | /** | |
526 | * garrow_gio_output_stream_new: | |
527 | * @gio_output_stream: The stream to be output. | |
528 | * | |
529 | * Returns: (transfer full): A newly created #GArrowGIOOutputStream. | |
530 | */ | |
531 | GArrowGIOOutputStream * | |
532 | garrow_gio_output_stream_new(GOutputStream *gio_output_stream) | |
533 | { | |
534 | auto arrow_output_stream = | |
535 | std::make_shared<garrow::GIOOutputStream>(gio_output_stream); | |
536 | auto object = g_object_new(GARROW_TYPE_GIO_OUTPUT_STREAM, | |
537 | "output-stream", &arrow_output_stream, | |
538 | "raw", gio_output_stream, | |
539 | NULL); | |
540 | auto output_stream = GARROW_GIO_OUTPUT_STREAM(object); | |
541 | return output_stream; | |
542 | } | |
543 | ||
544 | /** | |
545 | * garrow_gio_output_stream_get_raw: | |
546 | * @output_stream: A #GArrowGIOOutputStream. | |
547 | * | |
548 | * Returns: (transfer none): The wrapped #GOutputStream. | |
549 | * | |
550 | * Since: 0.5.0 | |
551 | * | |
552 | * Deprecated: 0.12.0: Use GArrowGIOOutputStream::raw property instead. | |
553 | */ | |
554 | GOutputStream * | |
555 | garrow_gio_output_stream_get_raw(GArrowGIOOutputStream *output_stream) | |
556 | { | |
557 | auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(output_stream); | |
558 | return priv->raw; | |
559 | } | |
560 | ||
561 | typedef struct GArrowCompressedOutputStreamPrivate_ { | |
562 | GArrowCodec *codec; | |
563 | GArrowOutputStream *raw; | |
564 | } GArrowCompressedOutputStreamPrivate; | |
565 | ||
566 | enum { | |
567 | PROP_CODEC = 1, | |
568 | PROP_RAW | |
569 | }; | |
570 | ||
571 | G_DEFINE_TYPE_WITH_PRIVATE(GArrowCompressedOutputStream, | |
572 | garrow_compressed_output_stream, | |
573 | GARROW_TYPE_OUTPUT_STREAM) | |
574 | ||
575 | #define GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object) \ | |
576 | static_cast<GArrowCompressedOutputStreamPrivate *>( \ | |
577 | garrow_compressed_output_stream_get_instance_private( \ | |
578 | GARROW_COMPRESSED_OUTPUT_STREAM(object))) | |
579 | ||
580 | static void | |
581 | garrow_compressed_output_stream_dispose(GObject *object) | |
582 | { | |
583 | auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object); | |
584 | ||
585 | if (priv->codec) { | |
586 | g_object_unref(priv->codec); | |
587 | priv->codec = NULL; | |
588 | } | |
589 | ||
590 | if (priv->raw) { | |
591 | g_object_unref(priv->raw); | |
592 | priv->raw = NULL; | |
593 | } | |
594 | ||
595 | G_OBJECT_CLASS(garrow_compressed_output_stream_parent_class)->dispose(object); | |
596 | } | |
597 | ||
598 | static void | |
599 | garrow_compressed_output_stream_set_property(GObject *object, | |
600 | guint prop_id, | |
601 | const GValue *value, | |
602 | GParamSpec *pspec) | |
603 | { | |
604 | auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object); | |
605 | ||
606 | switch (prop_id) { | |
607 | case PROP_CODEC: | |
608 | priv->codec = GARROW_CODEC(g_value_dup_object(value)); | |
609 | break; | |
610 | case PROP_RAW: | |
611 | priv->raw = GARROW_OUTPUT_STREAM(g_value_dup_object(value)); | |
612 | break; | |
613 | default: | |
614 | G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); | |
615 | break; | |
616 | } | |
617 | } | |
618 | ||
619 | static void | |
620 | garrow_compressed_output_stream_get_property(GObject *object, | |
621 | guint prop_id, | |
622 | GValue *value, | |
623 | GParamSpec *pspec) | |
624 | { | |
625 | auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object); | |
626 | ||
627 | switch (prop_id) { | |
628 | case PROP_CODEC: | |
629 | g_value_set_object(value, priv->codec); | |
630 | break; | |
631 | case PROP_RAW: | |
632 | g_value_set_object(value, priv->raw); | |
633 | break; | |
634 | default: | |
635 | G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); | |
636 | break; | |
637 | } | |
638 | } | |
639 | ||
640 | static void | |
641 | garrow_compressed_output_stream_init(GArrowCompressedOutputStream *object) | |
642 | { | |
643 | } | |
644 | ||
645 | static void | |
646 | garrow_compressed_output_stream_class_init(GArrowCompressedOutputStreamClass *klass) | |
647 | { | |
648 | auto gobject_class = G_OBJECT_CLASS(klass); | |
649 | ||
650 | gobject_class->dispose = garrow_compressed_output_stream_dispose; | |
651 | gobject_class->set_property = garrow_compressed_output_stream_set_property; | |
652 | gobject_class->get_property = garrow_compressed_output_stream_get_property; | |
653 | ||
654 | GParamSpec *spec; | |
655 | spec = g_param_spec_object("codec", | |
656 | "Codec", | |
657 | "The codec for the stream", | |
658 | GARROW_TYPE_CODEC, | |
659 | static_cast<GParamFlags>(G_PARAM_READWRITE | | |
660 | G_PARAM_CONSTRUCT_ONLY)); | |
661 | g_object_class_install_property(gobject_class, PROP_CODEC, spec); | |
662 | ||
663 | spec = g_param_spec_object("raw", | |
664 | "Raw", | |
665 | "The underlying raw output stream", | |
666 | GARROW_TYPE_OUTPUT_STREAM, | |
667 | static_cast<GParamFlags>(G_PARAM_READWRITE | | |
668 | G_PARAM_CONSTRUCT_ONLY)); | |
669 | g_object_class_install_property(gobject_class, PROP_RAW, spec); | |
670 | } | |
671 | ||
672 | /** | |
673 | * garrow_compressed_output_stream_new: | |
674 | * @codec: A #GArrowCodec for compressed data in the @raw. | |
675 | * @raw: A #GArrowOutputStream that is a sink for compressed data. | |
676 | * @error: (nullable): Return location for a #GError or %NULL. | |
677 | * | |
678 | * Returns: A newly created #GArrowCompressedOutputStream. | |
679 | * | |
680 | * Since: 0.12.0 | |
681 | */ | |
682 | GArrowCompressedOutputStream * | |
683 | garrow_compressed_output_stream_new(GArrowCodec *codec, | |
684 | GArrowOutputStream *raw, | |
685 | GError **error) | |
686 | { | |
687 | auto arrow_codec = garrow_codec_get_raw(codec).get(); | |
688 | auto arrow_raw = garrow_output_stream_get_raw(raw); | |
689 | auto arrow_stream = arrow::io::CompressedOutputStream::Make(arrow_codec, | |
690 | arrow_raw); | |
691 | if (garrow::check(error, arrow_stream, "[compressed-output-stream][new]")) { | |
692 | return garrow_compressed_output_stream_new_raw(&(arrow_stream.ValueOrDie()), | |
693 | codec, | |
694 | raw); | |
695 | } else { | |
696 | return NULL; | |
697 | } | |
698 | } | |
699 | ||
700 | G_END_DECLS | |
701 | ||
702 | ||
703 | GArrowOutputStream * | |
704 | garrow_output_stream_new_raw(std::shared_ptr<arrow::io::OutputStream> *arrow_output_stream) | |
705 | { | |
706 | auto output_stream = | |
707 | GARROW_OUTPUT_STREAM(g_object_new(GARROW_TYPE_OUTPUT_STREAM, | |
708 | "output-stream", arrow_output_stream, | |
709 | NULL)); | |
710 | return output_stream; | |
711 | } | |
712 | ||
713 | std::shared_ptr<arrow::io::OutputStream> | |
714 | garrow_output_stream_get_raw(GArrowOutputStream *output_stream) | |
715 | { | |
716 | auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(output_stream); | |
717 | return priv->output_stream; | |
718 | } | |
719 | ||
720 | ||
721 | GArrowFileOutputStream * | |
722 | garrow_file_output_stream_new_raw(std::shared_ptr<arrow::io::FileOutputStream> *arrow_file_output_stream) | |
723 | { | |
724 | auto file_output_stream = | |
725 | GARROW_FILE_OUTPUT_STREAM(g_object_new(GARROW_TYPE_FILE_OUTPUT_STREAM, | |
726 | "output-stream", arrow_file_output_stream, | |
727 | NULL)); | |
728 | return file_output_stream; | |
729 | } | |
730 | ||
731 | GArrowBufferOutputStream * | |
732 | garrow_buffer_output_stream_new_raw(std::shared_ptr<arrow::io::BufferOutputStream> *arrow_buffer_output_stream) | |
733 | { | |
734 | auto buffer_output_stream = | |
735 | GARROW_BUFFER_OUTPUT_STREAM(g_object_new(GARROW_TYPE_BUFFER_OUTPUT_STREAM, | |
736 | "output-stream", arrow_buffer_output_stream, | |
737 | NULL)); | |
738 | return buffer_output_stream; | |
739 | } | |
740 | ||
741 | GArrowCompressedOutputStream * | |
742 | garrow_compressed_output_stream_new_raw(std::shared_ptr<arrow::io::CompressedOutputStream> *arrow_raw, | |
743 | GArrowCodec *codec, | |
744 | GArrowOutputStream *raw) | |
745 | { | |
746 | auto compressed_output_stream = | |
747 | g_object_new(GARROW_TYPE_COMPRESSED_OUTPUT_STREAM, | |
748 | "output-stream", arrow_raw, | |
749 | "codec", codec, | |
750 | "raw", raw, | |
751 | NULL); | |
752 | return GARROW_COMPRESSED_OUTPUT_STREAM(compressed_output_stream); | |
753 | } | |
754 | ||
755 | std::shared_ptr<arrow::io::OutputStream> | |
756 | garrow_compressed_output_stream_get_raw(GArrowCompressedOutputStream *compressed_output_stream) | |
757 | { | |
758 | auto output_stream = GARROW_OUTPUT_STREAM(compressed_output_stream); | |
759 | auto arrow_output_stream = garrow_output_stream_get_raw(output_stream); | |
760 | auto arrow_compressed_output_stream = | |
761 | std::static_pointer_cast<arrow::io::CompressedOutputStream>(arrow_output_stream); | |
762 | return arrow_compressed_output_stream->raw(); | |
763 | } |