]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/arrow/dataset/dataset_writer.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / dataset / dataset_writer.h
diff --git a/ceph/src/arrow/cpp/src/arrow/dataset/dataset_writer.h b/ceph/src/arrow/cpp/src/arrow/dataset/dataset_writer.h
new file mode 100644 (file)
index 0000000..b014f96
--- /dev/null
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "arrow/dataset/file_base.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/util/async_util.h"
+#include "arrow/util/future.h"
+
+namespace arrow {
+namespace dataset {
+namespace internal {
+
+constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 64 * 1024 * 1024;
+
+/// \brief Utility class that manages a set of writers to different paths
+///
+/// Writers may be closed and reopened (and a new file created) based on the dataset
+/// write options (for example, max_rows_per_file or max_open_files)
+///
+/// The dataset writer enforces its own back pressure based on the # of rows (as opposed
+/// to # of batches which is how it is typically enforced elsewhere) and # of files.
+class ARROW_DS_EXPORT DatasetWriter {
+ public:
+  /// \brief Create a dataset writer
+  ///
+  /// Will fail if basename_template is invalid or if there is existing data and
+  /// existing_data_behavior is kError
+  ///
+  /// \param write_options options to control how the data should be written
+  /// \param max_rows_queued max # of rows allowed to be queued before the dataset_writer
+  ///                        will ask for backpressure
+  static Result<std::unique_ptr<DatasetWriter>> Make(
+      FileSystemDatasetWriteOptions write_options,
+      uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued);
+
+  ~DatasetWriter();
+
+  /// \brief Write a batch to the dataset
+  /// \param[in] batch The batch to write
+  /// \param[in] directory The directory to write to
+  ///
+  /// Note: The written filename will be {directory}/{filename_factory(i)} where i is a
+  /// counter controlled by `max_open_files` and `max_rows_per_file`
+  ///
+  /// If multiple WriteRecordBatch calls arrive with the same `directory` then the batches
+  /// may be written to the same file.
+  ///
+  /// The returned future will be marked finished when the record batch has been queued
+  /// to be written.  If the returned future is unfinished then this indicates the dataset
+  /// writer's queue is full and the data provider should pause.
+  ///
+  /// This method is NOT async reentrant.  The returned future will only be unfinished
+  /// if back pressure needs to be applied.  Async reentrancy is not necessary for
+  /// concurrent writes to happen.  Calling this method again before the previous future
+  /// completes will not just violate max_rows_queued but likely lead to race conditions.
+  ///
+  /// One thing to note is that the ordering of your data can affect your maximum
+  /// potential parallelism.  If this seems odd then consider a dataset where the first
+  /// 1000 batches go to the same directory and then the 1001st batch goes to a different
+  /// directory.  The only way to get two parallel writes immediately would be to queue
+  /// all 1000 pending writes to the first directory.
+  Future<> WriteRecordBatch(std::shared_ptr<RecordBatch> batch,
+                            const std::string& directory);
+
+  /// Finish all pending writes and close any open files
+  Future<> Finish();
+
+ protected:
+  DatasetWriter(FileSystemDatasetWriteOptions write_options,
+                uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued);
+
+  class DatasetWriterImpl;
+  std::unique_ptr<DatasetWriterImpl, util::DestroyingDeleter<DatasetWriterImpl>> impl_;
+};
+
+}  // namespace internal
+}  // namespace dataset
+}  // namespace arrow