]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/c_glib/arrow-glib/output-stream.cpp
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / c_glib / arrow-glib / output-stream.cpp
diff --git a/ceph/src/arrow/c_glib/arrow-glib/output-stream.cpp b/ceph/src/arrow/c_glib/arrow-glib/output-stream.cpp
new file mode 100644 (file)
index 0000000..9eaee9a
--- /dev/null
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/io/memory.h>
+#include <arrow/ipc/writer.h>
+
+#include <arrow-glib/buffer.hpp>
+#include <arrow-glib/codec.hpp>
+#include <arrow-glib/error.hpp>
+#include <arrow-glib/file.hpp>
+#include <arrow-glib/ipc-options.hpp>
+#include <arrow-glib/output-stream.hpp>
+#include <arrow-glib/record-batch.hpp>
+#include <arrow-glib/tensor.hpp>
+#include <arrow-glib/writable.hpp>
+
+#include <iostream>
+#include <sstream>
+
+G_BEGIN_DECLS
+
+/**
+ * SECTION: output-stream
+ * @section_id: output-stream-classes
+ * @title: Output stream classes
+ * @include: arrow-glib/arrow-glib.h
+ *
+ * #GArrowOutputStream is an interface for stream output. Stream
+ * output is file based and writable
+ *
+ * #GArrowFileOutputStream is a class for file output stream.
+ *
+ * #GArrowBufferOutputStream is a class for buffer output stream.
+ *
+ * #GArrowGIOOutputStream is a class for `GOutputStream` based output
+ * stream.
+ *
+ * #GArrowCompressedOutputStream is a class to write compressed data to
+ * output stream.
+ */
+
+typedef struct GArrowOutputStreamPrivate_ {
+  std::shared_ptr<arrow::io::OutputStream> output_stream;
+} GArrowOutputStreamPrivate;
+
+enum {
+  PROP_0,
+  PROP_OUTPUT_STREAM
+};
+
+static std::shared_ptr<arrow::io::FileInterface>
+garrow_output_stream_get_raw_file_interface(GArrowFile *file)
+{
+  auto output_stream = GARROW_OUTPUT_STREAM(file);
+  auto arrow_output_stream = garrow_output_stream_get_raw(output_stream);
+  return arrow_output_stream;
+}
+
+static void
+garrow_output_stream_file_interface_init(GArrowFileInterface *iface)
+{
+  iface->get_raw = garrow_output_stream_get_raw_file_interface;
+}
+
+static std::shared_ptr<arrow::io::Writable>
+garrow_output_stream_get_raw_writable_interface(GArrowWritable *writable)
+{
+  auto output_stream = GARROW_OUTPUT_STREAM(writable);
+  auto arrow_output_stream = garrow_output_stream_get_raw(output_stream);
+  return arrow_output_stream;
+}
+
+static void
+garrow_output_stream_writable_interface_init(GArrowWritableInterface *iface)
+{
+  iface->get_raw = garrow_output_stream_get_raw_writable_interface;
+}
+
+G_DEFINE_TYPE_WITH_CODE(GArrowOutputStream,
+                        garrow_output_stream,
+                        G_TYPE_OBJECT,
+                        G_ADD_PRIVATE(GArrowOutputStream)
+                        G_IMPLEMENT_INTERFACE(GARROW_TYPE_FILE,
+                                              garrow_output_stream_file_interface_init)
+                        G_IMPLEMENT_INTERFACE(GARROW_TYPE_WRITABLE,
+                                              garrow_output_stream_writable_interface_init));
+
+#define GARROW_OUTPUT_STREAM_GET_PRIVATE(obj)         \
+  static_cast<GArrowOutputStreamPrivate *>(           \
+     garrow_output_stream_get_instance_private(       \
+       GARROW_OUTPUT_STREAM(obj)))
+
+static void
+garrow_output_stream_finalize(GObject *object)
+{
+  auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object);
+
+  priv->output_stream.~shared_ptr();
+
+  G_OBJECT_CLASS(garrow_output_stream_parent_class)->finalize(object);
+}
+
+static void
+garrow_output_stream_set_property(GObject *object,
+                                          guint prop_id,
+                                          const GValue *value,
+                                          GParamSpec *pspec)
+{
+  auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object);
+
+  switch (prop_id) {
+  case PROP_OUTPUT_STREAM:
+    priv->output_stream =
+      *static_cast<std::shared_ptr<arrow::io::OutputStream> *>(g_value_get_pointer(value));
+    break;
+  default:
+    G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+    break;
+  }
+}
+
+static void
+garrow_output_stream_get_property(GObject *object,
+                                          guint prop_id,
+                                          GValue *value,
+                                          GParamSpec *pspec)
+{
+  switch (prop_id) {
+  default:
+    G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+    break;
+  }
+}
+
+static void
+garrow_output_stream_init(GArrowOutputStream *object)
+{
+  auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object);
+  new(&priv->output_stream) std::shared_ptr<arrow::io::OutputStream>;
+}
+
+static void
+garrow_output_stream_class_init(GArrowOutputStreamClass *klass)
+{
+  auto gobject_class = G_OBJECT_CLASS(klass);
+
+  gobject_class->finalize     = garrow_output_stream_finalize;
+  gobject_class->set_property = garrow_output_stream_set_property;
+  gobject_class->get_property = garrow_output_stream_get_property;
+
+  GParamSpec *spec;
+  spec = g_param_spec_pointer("output-stream",
+                              "io::OutputStream",
+                              "The raw std::shared<arrow::io::OutputStream> *",
+                              static_cast<GParamFlags>(G_PARAM_WRITABLE |
+                                                       G_PARAM_CONSTRUCT_ONLY));
+  g_object_class_install_property(gobject_class, PROP_OUTPUT_STREAM, spec);
+}
+
+/**
+ * garrow_output_stream_align:
+ * @stream: A #GArrowOutputStream.
+ * @alignment: The byte multiple for the metadata prefix, usually 8
+ *   or 64, to ensure the body starts on a multiple of that alignment.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: %TRUE on success, %FALSE on error.
+ *
+ * Since: 0.11.0
+ */
+gboolean
+garrow_output_stream_align(GArrowOutputStream *stream,
+                           gint32 alignment,
+                           GError **error)
+{
+  auto arrow_stream = garrow_output_stream_get_raw(stream);
+  auto status = arrow::ipc::AlignStream(arrow_stream.get(), alignment);
+  return garrow::check(error, status, "[output-stream][align]");
+}
+
+/**
+ * garrow_output_stream_write_tensor:
+ * @stream: A #GArrowOutputStream.
+ * @tensor: A #GArrowTensor to be written.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: The number of written bytes on success, -1 on error.
+ *
+ * Since: 0.4.0
+ */
+gint64
+garrow_output_stream_write_tensor(GArrowOutputStream *stream,
+                                  GArrowTensor *tensor,
+                                  GError **error)
+{
+  auto arrow_stream = garrow_output_stream_get_raw(stream);
+  auto arrow_tensor = garrow_tensor_get_raw(tensor);
+  int32_t metadata_length;
+  int64_t body_length;
+  auto status = arrow::ipc::WriteTensor(*arrow_tensor,
+                                        arrow_stream.get(),
+                                        &metadata_length,
+                                        &body_length);
+  if (garrow::check(error, status, "[output-stream][write-tensor]")) {
+    return metadata_length + body_length;
+  } else {
+    return -1;
+  }
+}
+
+/**
+ * garrow_output_stream_write_record_batch:
+ * @stream: A #GArrowOutputStream.
+ * @record_batch: A #GArrowRecordBatch to be written.
+ * @options: (nullable): A #GArrowWriteOptions.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: The number of written bytes on success, -1 on error.
+ *
+ * Since: 1.0.0
+ */
+gint64
+garrow_output_stream_write_record_batch(GArrowOutputStream *stream,
+                                        GArrowRecordBatch *record_batch,
+                                        GArrowWriteOptions *options,
+                                        GError **error)
+{
+  auto arrow_stream = garrow_output_stream_get_raw(stream);
+  auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
+  int64_t buffer_start_offset = 0;
+  int32_t metadata_length;
+  int64_t body_length;
+  arrow::Status status;
+  if (options) {
+    auto arrow_options = garrow_write_options_get_raw(options);
+    status = arrow::ipc::WriteRecordBatch(*arrow_record_batch,
+                                          buffer_start_offset,
+                                          arrow_stream.get(),
+                                          &metadata_length,
+                                          &body_length,
+                                          *arrow_options);
+  } else {
+    auto arrow_options = arrow::ipc::IpcWriteOptions::Defaults();
+    status = arrow::ipc::WriteRecordBatch(*arrow_record_batch,
+                                          buffer_start_offset,
+                                          arrow_stream.get(),
+                                          &metadata_length,
+                                          &body_length,
+                                          arrow_options);
+  }
+  if (garrow::check(error, status, "[output-stream][write-record-batch]")) {
+    return metadata_length + body_length;
+  } else {
+    return -1;
+  }
+}
+
+
+G_DEFINE_TYPE(GArrowFileOutputStream,
+              garrow_file_output_stream,
+              GARROW_TYPE_OUTPUT_STREAM);
+
+static void
+garrow_file_output_stream_init(GArrowFileOutputStream *file_output_stream)
+{
+}
+
+static void
+garrow_file_output_stream_class_init(GArrowFileOutputStreamClass *klass)
+{
+}
+
+/**
+ * garrow_file_output_stream_new:
+ * @path: The path of the file output stream.
+ * @append: Whether the path is opened as append mode or recreate mode.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: (nullable): A newly opened #GArrowFileOutputStream or
+ *   %NULL on error.
+ */
+GArrowFileOutputStream *
+garrow_file_output_stream_new(const gchar *path,
+                              gboolean append,
+                              GError **error)
+{
+  auto arrow_file_output_stream_result =
+    arrow::io::FileOutputStream::Open(std::string(path), append);
+  if (arrow_file_output_stream_result.ok()) {
+    auto arrow_file_output_stream =
+      arrow_file_output_stream_result.ValueOrDie();
+    return garrow_file_output_stream_new_raw(&arrow_file_output_stream);
+  } else {
+    std::string context("[io][file-output-stream][open]: <");
+    context += path;
+    context += ">";
+    garrow::check(error, arrow_file_output_stream_result, context.c_str());
+    return NULL;
+  }
+}
+
+
+G_DEFINE_TYPE(GArrowBufferOutputStream,
+              garrow_buffer_output_stream,
+              GARROW_TYPE_OUTPUT_STREAM);
+
+static void
+garrow_buffer_output_stream_init(GArrowBufferOutputStream *buffer_output_stream)
+{
+}
+
+static void
+garrow_buffer_output_stream_class_init(GArrowBufferOutputStreamClass *klass)
+{
+}
+
+/**
+ * garrow_buffer_output_stream_new:
+ * @buffer: The resizable buffer to be output.
+ *
+ * Returns: (transfer full): A newly created #GArrowBufferOutputStream.
+ */
+GArrowBufferOutputStream *
+garrow_buffer_output_stream_new(GArrowResizableBuffer *buffer)
+{
+  auto arrow_buffer = garrow_buffer_get_raw(GARROW_BUFFER(buffer));
+  auto arrow_resizable_buffer =
+    std::static_pointer_cast<arrow::ResizableBuffer>(arrow_buffer);
+  auto arrow_buffer_output_stream =
+    std::make_shared<arrow::io::BufferOutputStream>(arrow_resizable_buffer);
+  return garrow_buffer_output_stream_new_raw(&arrow_buffer_output_stream);
+}
+
+G_END_DECLS
+
+
+namespace garrow {
+  class GIOOutputStream : public arrow::io::OutputStream {
+  public:
+    GIOOutputStream(GOutputStream *output_stream) :
+      output_stream_(output_stream),
+      position_(0) {
+      g_object_ref(output_stream_);
+    }
+
+    ~GIOOutputStream() {
+      g_object_unref(output_stream_);
+    }
+
+    GOutputStream *get_output_stream() {
+      return output_stream_;
+    }
+
+    bool closed() const override {
+      return static_cast<bool>(g_output_stream_is_closed(output_stream_));
+    }
+
+    arrow::Status Close() override {
+      GError *error = NULL;
+      if (g_output_stream_close(output_stream_, NULL, &error)) {
+        return arrow::Status::OK();
+      } else {
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      "[gio-output-stream][close]");
+      }
+    }
+
+    arrow::Result<int64_t> Tell() const override {
+      if (G_IS_SEEKABLE(output_stream_)) {
+        return g_seekable_tell(G_SEEKABLE(output_stream_));
+      } else {
+        return position_;
+      }
+    }
+
+    arrow::Status Write(const void *data,
+                        int64_t n_bytes) override {
+      GError *error = NULL;
+      gsize n_written_bytes;
+      auto successed = g_output_stream_write_all(output_stream_,
+                                                 data,
+                                                 n_bytes,
+                                                 &n_written_bytes,
+                                                 NULL,
+                                                 &error);
+      if (successed) {
+        position_ += n_written_bytes;
+        return arrow::Status::OK();
+      } else {
+        std::stringstream message("[gio-output-stream][write]");
+        message << "[" << n_written_bytes << "/" << n_bytes << "]";
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      message.str().c_str());
+      }
+    }
+
+    arrow::Status Flush() override {
+      GError *error = NULL;
+      auto successed = g_output_stream_flush(output_stream_, NULL, &error);
+      if (successed) {
+        return arrow::Status::OK();
+      } else {
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      "[gio-output-stream][flush]");
+      }
+    }
+
+  private:
+    GOutputStream *output_stream_;
+    int64_t position_;
+  };
+};
+
+G_BEGIN_DECLS
+
+typedef struct GArrowGIOOutputStreamPrivate_ {
+  GOutputStream *raw;
+} GArrowGIOOutputStreamPrivate;
+
+enum {
+  PROP_GIO_RAW = 1
+};
+
+G_DEFINE_TYPE_WITH_PRIVATE(GArrowGIOOutputStream,
+                           garrow_gio_output_stream,
+                           GARROW_TYPE_OUTPUT_STREAM);
+
+#define GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object)    \
+  static_cast<GArrowGIOOutputStreamPrivate *>(          \
+    garrow_gio_output_stream_get_instance_private(      \
+      GARROW_GIO_OUTPUT_STREAM(object)))
+
+static void
+garrow_gio_output_stream_dispose(GObject *object)
+{
+  auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object);
+
+  if (priv->raw) {
+    g_object_unref(priv->raw);
+    priv->raw = nullptr;
+  }
+
+  G_OBJECT_CLASS(garrow_gio_output_stream_parent_class)->dispose(object);
+}
+
+static void
+garrow_gio_output_stream_set_property(GObject *object,
+                                      guint prop_id,
+                                      const GValue *value,
+                                      GParamSpec *pspec)
+{
+  auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object);
+
+  switch (prop_id) {
+  case PROP_GIO_RAW:
+    priv->raw = G_OUTPUT_STREAM(g_value_dup_object(value));
+    break;
+  default:
+    G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+    break;
+  }
+}
+
+static void
+garrow_gio_output_stream_get_property(GObject *object,
+                                      guint prop_id,
+                                      GValue *value,
+                                      GParamSpec *pspec)
+{
+  auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object);
+
+  switch (prop_id) {
+  case PROP_GIO_RAW:
+    g_value_set_object(value, priv->raw);
+    break;
+  default:
+    G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+    break;
+  }
+}
+
+static void
+garrow_gio_output_stream_init(GArrowGIOOutputStream *object)
+{
+}
+
+static void
+garrow_gio_output_stream_class_init(GArrowGIOOutputStreamClass *klass)
+{
+  auto gobject_class = G_OBJECT_CLASS(klass);
+
+  gobject_class->dispose      = garrow_gio_output_stream_dispose;
+  gobject_class->set_property = garrow_gio_output_stream_set_property;
+  gobject_class->get_property = garrow_gio_output_stream_get_property;
+
+  GParamSpec *spec;
+  spec = g_param_spec_object("raw",
+                             "Raw",
+                             "The raw GOutputStream *",
+                             G_TYPE_OUTPUT_STREAM,
+                             static_cast<GParamFlags>(G_PARAM_READWRITE |
+                                                      G_PARAM_CONSTRUCT_ONLY));
+  g_object_class_install_property(gobject_class, PROP_GIO_RAW, spec);
+}
+
+/**
+ * garrow_gio_output_stream_new:
+ * @gio_output_stream: The stream to be output.
+ *
+ * Returns: (transfer full): A newly created #GArrowGIOOutputStream.
+ */
+GArrowGIOOutputStream *
+garrow_gio_output_stream_new(GOutputStream *gio_output_stream)
+{
+  auto arrow_output_stream =
+    std::make_shared<garrow::GIOOutputStream>(gio_output_stream);
+  auto object = g_object_new(GARROW_TYPE_GIO_OUTPUT_STREAM,
+                             "output-stream", &arrow_output_stream,
+                             "raw", gio_output_stream,
+                             NULL);
+  auto output_stream = GARROW_GIO_OUTPUT_STREAM(object);
+  return output_stream;
+}
+
+/**
+ * garrow_gio_output_stream_get_raw:
+ * @output_stream: A #GArrowGIOOutputStream.
+ *
+ * Returns: (transfer none): The wrapped #GOutputStream.
+ *
+ * Since: 0.5.0
+ *
+ * Deprecated: 0.12.0: Use GArrowGIOOutputStream::raw property instead.
+ */
+GOutputStream *
+garrow_gio_output_stream_get_raw(GArrowGIOOutputStream *output_stream)
+{
+  auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(output_stream);
+  return priv->raw;
+}
+
+typedef struct GArrowCompressedOutputStreamPrivate_ {
+  GArrowCodec *codec;
+  GArrowOutputStream *raw;
+} GArrowCompressedOutputStreamPrivate;
+
+enum {
+  PROP_CODEC = 1,
+  PROP_RAW
+};
+
+G_DEFINE_TYPE_WITH_PRIVATE(GArrowCompressedOutputStream,
+                           garrow_compressed_output_stream,
+                           GARROW_TYPE_OUTPUT_STREAM)
+
+#define GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object)     \
+  static_cast<GArrowCompressedOutputStreamPrivate *>(           \
+    garrow_compressed_output_stream_get_instance_private(       \
+      GARROW_COMPRESSED_OUTPUT_STREAM(object)))
+
+static void
+garrow_compressed_output_stream_dispose(GObject *object)
+{
+  auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object);
+
+  if (priv->codec) {
+    g_object_unref(priv->codec);
+    priv->codec = NULL;
+  }
+
+  if (priv->raw) {
+    g_object_unref(priv->raw);
+    priv->raw = NULL;
+  }
+
+  G_OBJECT_CLASS(garrow_compressed_output_stream_parent_class)->dispose(object);
+}
+
+static void
+garrow_compressed_output_stream_set_property(GObject *object,
+                                             guint prop_id,
+                                             const GValue *value,
+                                             GParamSpec *pspec)
+{
+  auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object);
+
+  switch (prop_id) {
+  case PROP_CODEC:
+    priv->codec = GARROW_CODEC(g_value_dup_object(value));
+    break;
+  case PROP_RAW:
+    priv->raw = GARROW_OUTPUT_STREAM(g_value_dup_object(value));
+    break;
+  default:
+    G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+    break;
+  }
+}
+
+static void
+garrow_compressed_output_stream_get_property(GObject *object,
+                                             guint prop_id,
+                                             GValue *value,
+                                             GParamSpec *pspec)
+{
+  auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object);
+
+  switch (prop_id) {
+  case PROP_CODEC:
+    g_value_set_object(value, priv->codec);
+    break;
+  case PROP_RAW:
+    g_value_set_object(value, priv->raw);
+    break;
+  default:
+    G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+    break;
+  }
+}
+
+static void
+garrow_compressed_output_stream_init(GArrowCompressedOutputStream *object)
+{
+}
+
+static void
+garrow_compressed_output_stream_class_init(GArrowCompressedOutputStreamClass *klass)
+{
+  auto gobject_class = G_OBJECT_CLASS(klass);
+
+  gobject_class->dispose      = garrow_compressed_output_stream_dispose;
+  gobject_class->set_property = garrow_compressed_output_stream_set_property;
+  gobject_class->get_property = garrow_compressed_output_stream_get_property;
+
+  GParamSpec *spec;
+  spec = g_param_spec_object("codec",
+                             "Codec",
+                             "The codec for the stream",
+                             GARROW_TYPE_CODEC,
+                             static_cast<GParamFlags>(G_PARAM_READWRITE |
+                                                      G_PARAM_CONSTRUCT_ONLY));
+  g_object_class_install_property(gobject_class, PROP_CODEC, spec);
+
+  spec = g_param_spec_object("raw",
+                             "Raw",
+                             "The underlying raw output stream",
+                             GARROW_TYPE_OUTPUT_STREAM,
+                             static_cast<GParamFlags>(G_PARAM_READWRITE |
+                                                      G_PARAM_CONSTRUCT_ONLY));
+  g_object_class_install_property(gobject_class, PROP_RAW, spec);
+}
+
+/**
+ * garrow_compressed_output_stream_new:
+ * @codec: A #GArrowCodec for compressed data in the @raw.
+ * @raw: A #GArrowOutputStream that is a sink for compressed data.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: A newly created #GArrowCompressedOutputStream.
+ *
+ * Since: 0.12.0
+ */
+GArrowCompressedOutputStream *
+garrow_compressed_output_stream_new(GArrowCodec *codec,
+                                    GArrowOutputStream *raw,
+                                    GError **error)
+{
+  auto arrow_codec = garrow_codec_get_raw(codec).get();
+  auto arrow_raw = garrow_output_stream_get_raw(raw);
+  auto arrow_stream = arrow::io::CompressedOutputStream::Make(arrow_codec,
+                                                              arrow_raw);
+  if (garrow::check(error, arrow_stream, "[compressed-output-stream][new]")) {
+    return garrow_compressed_output_stream_new_raw(&(arrow_stream.ValueOrDie()),
+                                                   codec,
+                                                   raw);
+  } else {
+    return NULL;
+  }
+}
+
+G_END_DECLS
+
+
+GArrowOutputStream *
+garrow_output_stream_new_raw(std::shared_ptr<arrow::io::OutputStream> *arrow_output_stream)
+{
+  auto output_stream =
+    GARROW_OUTPUT_STREAM(g_object_new(GARROW_TYPE_OUTPUT_STREAM,
+                                      "output-stream", arrow_output_stream,
+                                      NULL));
+  return output_stream;
+}
+
+std::shared_ptr<arrow::io::OutputStream>
+garrow_output_stream_get_raw(GArrowOutputStream *output_stream)
+{
+  auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(output_stream);
+  return priv->output_stream;
+}
+
+
+GArrowFileOutputStream *
+garrow_file_output_stream_new_raw(std::shared_ptr<arrow::io::FileOutputStream> *arrow_file_output_stream)
+{
+  auto file_output_stream =
+    GARROW_FILE_OUTPUT_STREAM(g_object_new(GARROW_TYPE_FILE_OUTPUT_STREAM,
+                                           "output-stream", arrow_file_output_stream,
+                                           NULL));
+  return file_output_stream;
+}
+
+GArrowBufferOutputStream *
+garrow_buffer_output_stream_new_raw(std::shared_ptr<arrow::io::BufferOutputStream> *arrow_buffer_output_stream)
+{
+  auto buffer_output_stream =
+    GARROW_BUFFER_OUTPUT_STREAM(g_object_new(GARROW_TYPE_BUFFER_OUTPUT_STREAM,
+                                             "output-stream", arrow_buffer_output_stream,
+                                             NULL));
+  return buffer_output_stream;
+}
+
+GArrowCompressedOutputStream *
+garrow_compressed_output_stream_new_raw(std::shared_ptr<arrow::io::CompressedOutputStream> *arrow_raw,
+                                        GArrowCodec *codec,
+                                        GArrowOutputStream *raw)
+{
+  auto compressed_output_stream =
+    g_object_new(GARROW_TYPE_COMPRESSED_OUTPUT_STREAM,
+                 "output-stream", arrow_raw,
+                 "codec", codec,
+                 "raw", raw,
+                 NULL);
+  return GARROW_COMPRESSED_OUTPUT_STREAM(compressed_output_stream);
+}
+
+std::shared_ptr<arrow::io::OutputStream>
+garrow_compressed_output_stream_get_raw(GArrowCompressedOutputStream *compressed_output_stream)
+{
+  auto output_stream = GARROW_OUTPUT_STREAM(compressed_output_stream);
+  auto arrow_output_stream = garrow_output_stream_get_raw(output_stream);
+  auto arrow_compressed_output_stream =
+    std::static_pointer_cast<arrow::io::CompressedOutputStream>(arrow_output_stream);
+  return arrow_compressed_output_stream->raw();
+}