]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/python/arrow_to_pandas.cc
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / arrow / cpp / src / arrow / python / arrow_to_pandas.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // Functions for pandas conversion via NumPy
19
20 #include "arrow/python/arrow_to_pandas.h"
21 #include "arrow/python/numpy_interop.h" // IWYU pragma: expand
22
23 #include <cmath>
24 #include <cstdint>
25 #include <iostream>
26 #include <memory>
27 #include <mutex>
28 #include <string>
29 #include <unordered_map>
30 #include <utility>
31 #include <vector>
32
33 #include "arrow/array.h"
34 #include "arrow/buffer.h"
35 #include "arrow/datum.h"
36 #include "arrow/status.h"
37 #include "arrow/table.h"
38 #include "arrow/type.h"
39 #include "arrow/type_traits.h"
40 #include "arrow/util/checked_cast.h"
41 #include "arrow/util/hashing.h"
42 #include "arrow/util/int_util.h"
43 #include "arrow/util/logging.h"
44 #include "arrow/util/macros.h"
45 #include "arrow/util/parallel.h"
46 #include "arrow/util/string_view.h"
47 #include "arrow/visitor_inline.h"
48
49 #include "arrow/compute/api.h"
50
51 #include "arrow/python/arrow_to_python_internal.h"
52 #include "arrow/python/common.h"
53 #include "arrow/python/datetime.h"
54 #include "arrow/python/decimal.h"
55 #include "arrow/python/helpers.h"
56 #include "arrow/python/numpy_convert.h"
57 #include "arrow/python/numpy_internal.h"
58 #include "arrow/python/pyarrow.h"
59 #include "arrow/python/python_to_arrow.h"
60 #include "arrow/python/type_traits.h"
61
62 namespace arrow {
63
64 class MemoryPool;
65
66 using internal::checked_cast;
67 using internal::CheckIndexBounds;
68 using internal::GetByteWidth;
69 using internal::OptionalParallelFor;
70
71 namespace py {
72 namespace {
73
74 // Fix options for conversion of an inner (child) array.
75 PandasOptions MakeInnerOptions(PandasOptions options) {
76 // Make sure conversion of inner dictionary arrays always returns an array,
77 // not a dict {'indices': array, 'dictionary': array, 'ordered': bool}
78 options.decode_dictionaries = true;
79 options.categorical_columns.clear();
80 options.strings_to_categorical = false;
81
82 // In ARROW-7723, we found as a result of ARROW-3789 that second
83 // through microsecond resolution tz-aware timestamps were being promoted to
84 // use the DATETIME_NANO_TZ conversion path, yielding a datetime64[ns] NumPy
85 // array in this function. PyArray_GETITEM returns datetime.datetime for
86 // units second through microsecond but PyLong for nanosecond (because
87 // datetime.datetime does not support nanoseconds).
88 // We force the object conversion to preserve the value of the timezone.
89 // Nanoseconds are returned as integers.
90 options.coerce_temporal_nanoseconds = false;
91
92 return options;
93 }
94
95 // ----------------------------------------------------------------------
96 // PyCapsule code for setting ndarray base to reference C++ object
97
98 struct ArrayCapsule {
99 std::shared_ptr<Array> array;
100 };
101
102 struct BufferCapsule {
103 std::shared_ptr<Buffer> buffer;
104 };
105
106 void ArrayCapsule_Destructor(PyObject* capsule) {
107 delete reinterpret_cast<ArrayCapsule*>(PyCapsule_GetPointer(capsule, "arrow::Array"));
108 }
109
110 void BufferCapsule_Destructor(PyObject* capsule) {
111 delete reinterpret_cast<BufferCapsule*>(PyCapsule_GetPointer(capsule, "arrow::Buffer"));
112 }
113
114 // ----------------------------------------------------------------------
115 // pandas 0.x DataFrame conversion internals
116
117 using internal::arrow_traits;
118 using internal::npy_traits;
119
120 template <typename T>
121 struct WrapBytes {};
122
123 template <>
124 struct WrapBytes<StringType> {
125 static inline PyObject* Wrap(const char* data, int64_t length) {
126 return PyUnicode_FromStringAndSize(data, length);
127 }
128 };
129
130 template <>
131 struct WrapBytes<LargeStringType> {
132 static inline PyObject* Wrap(const char* data, int64_t length) {
133 return PyUnicode_FromStringAndSize(data, length);
134 }
135 };
136
137 template <>
138 struct WrapBytes<BinaryType> {
139 static inline PyObject* Wrap(const char* data, int64_t length) {
140 return PyBytes_FromStringAndSize(data, length);
141 }
142 };
143
144 template <>
145 struct WrapBytes<LargeBinaryType> {
146 static inline PyObject* Wrap(const char* data, int64_t length) {
147 return PyBytes_FromStringAndSize(data, length);
148 }
149 };
150
151 template <>
152 struct WrapBytes<FixedSizeBinaryType> {
153 static inline PyObject* Wrap(const char* data, int64_t length) {
154 return PyBytes_FromStringAndSize(data, length);
155 }
156 };
157
158 static inline bool ListTypeSupported(const DataType& type) {
159 switch (type.id()) {
160 case Type::BOOL:
161 case Type::UINT8:
162 case Type::INT8:
163 case Type::UINT16:
164 case Type::INT16:
165 case Type::UINT32:
166 case Type::INT32:
167 case Type::INT64:
168 case Type::UINT64:
169 case Type::FLOAT:
170 case Type::DOUBLE:
171 case Type::DECIMAL128:
172 case Type::DECIMAL256:
173 case Type::BINARY:
174 case Type::LARGE_BINARY:
175 case Type::STRING:
176 case Type::LARGE_STRING:
177 case Type::DATE32:
178 case Type::DATE64:
179 case Type::STRUCT:
180 case Type::TIME32:
181 case Type::TIME64:
182 case Type::TIMESTAMP:
183 case Type::DURATION:
184 case Type::DICTIONARY:
185 case Type::NA: // empty list
186 // The above types are all supported.
187 return true;
188 case Type::FIXED_SIZE_LIST:
189 case Type::LIST:
190 case Type::LARGE_LIST: {
191 const auto& list_type = checked_cast<const BaseListType&>(type);
192 return ListTypeSupported(*list_type.value_type());
193 }
194 default:
195 break;
196 }
197 return false;
198 }
199
200 Status CapsulizeArray(const std::shared_ptr<Array>& arr, PyObject** out) {
201 auto capsule = new ArrayCapsule{{arr}};
202 *out = PyCapsule_New(reinterpret_cast<void*>(capsule), "arrow::Array",
203 &ArrayCapsule_Destructor);
204 if (*out == nullptr) {
205 delete capsule;
206 RETURN_IF_PYERROR();
207 }
208 return Status::OK();
209 }
210
211 Status CapsulizeBuffer(const std::shared_ptr<Buffer>& buffer, PyObject** out) {
212 auto capsule = new BufferCapsule{{buffer}};
213 *out = PyCapsule_New(reinterpret_cast<void*>(capsule), "arrow::Buffer",
214 &BufferCapsule_Destructor);
215 if (*out == nullptr) {
216 delete capsule;
217 RETURN_IF_PYERROR();
218 }
219 return Status::OK();
220 }
221
222 Status SetNdarrayBase(PyArrayObject* arr, PyObject* base) {
223 if (PyArray_SetBaseObject(arr, base) == -1) {
224 // Error occurred, trust that SetBaseObject sets the error state
225 Py_XDECREF(base);
226 RETURN_IF_PYERROR();
227 }
228 return Status::OK();
229 }
230
231 Status SetBufferBase(PyArrayObject* arr, const std::shared_ptr<Buffer>& buffer) {
232 PyObject* base;
233 RETURN_NOT_OK(CapsulizeBuffer(buffer, &base));
234 return SetNdarrayBase(arr, base);
235 }
236
237 inline void set_numpy_metadata(int type, const DataType* datatype, PyArray_Descr* out) {
238 auto metadata = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(out->c_metadata);
239 if (type == NPY_DATETIME) {
240 if (datatype->id() == Type::TIMESTAMP) {
241 const auto& timestamp_type = checked_cast<const TimestampType&>(*datatype);
242 metadata->meta.base = internal::NumPyFrequency(timestamp_type.unit());
243 } else {
244 DCHECK(false) << "NPY_DATETIME views only supported for Arrow TIMESTAMP types";
245 }
246 } else if (type == NPY_TIMEDELTA) {
247 DCHECK_EQ(datatype->id(), Type::DURATION);
248 const auto& duration_type = checked_cast<const DurationType&>(*datatype);
249 metadata->meta.base = internal::NumPyFrequency(duration_type.unit());
250 }
251 }
252
253 Status PyArray_NewFromPool(int nd, npy_intp* dims, PyArray_Descr* descr, MemoryPool* pool,
254 PyObject** out) {
255 // ARROW-6570: Allocate memory from MemoryPool for a couple reasons
256 //
257 // * Track allocations
258 // * Get better performance through custom allocators
259 int64_t total_size = descr->elsize;
260 for (int i = 0; i < nd; ++i) {
261 total_size *= dims[i];
262 }
263
264 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBuffer(total_size, pool));
265 *out = PyArray_NewFromDescr(&PyArray_Type, descr, nd, dims,
266 /*strides=*/nullptr,
267 /*data=*/buffer->mutable_data(),
268 /*flags=*/NPY_ARRAY_CARRAY | NPY_ARRAY_WRITEABLE,
269 /*obj=*/nullptr);
270 if (*out == nullptr) {
271 RETURN_IF_PYERROR();
272 // Trust that error set if NULL returned
273 }
274 return SetBufferBase(reinterpret_cast<PyArrayObject*>(*out), std::move(buffer));
275 }
276
277 template <typename T = void>
278 inline const T* GetPrimitiveValues(const Array& arr) {
279 if (arr.length() == 0) {
280 return nullptr;
281 }
282 const int elsize = GetByteWidth(*arr.type());
283 const auto& prim_arr = checked_cast<const PrimitiveArray&>(arr);
284 return reinterpret_cast<const T*>(prim_arr.values()->data() + arr.offset() * elsize);
285 }
286
287 Status MakeNumPyView(std::shared_ptr<Array> arr, PyObject* py_ref, int npy_type, int ndim,
288 npy_intp* dims, PyObject** out) {
289 PyAcquireGIL lock;
290
291 PyArray_Descr* descr = internal::GetSafeNumPyDtype(npy_type);
292 set_numpy_metadata(npy_type, arr->type().get(), descr);
293 PyObject* result = PyArray_NewFromDescr(
294 &PyArray_Type, descr, ndim, dims, /*strides=*/nullptr,
295 const_cast<void*>(GetPrimitiveValues(*arr)), /*flags=*/0, nullptr);
296 PyArrayObject* np_arr = reinterpret_cast<PyArrayObject*>(result);
297 if (np_arr == nullptr) {
298 // Error occurred, trust that error set
299 return Status::OK();
300 }
301
302 PyObject* base;
303 if (py_ref == nullptr) {
304 // Capsule will be owned by the ndarray, no incref necessary. See
305 // ARROW-1973
306 RETURN_NOT_OK(CapsulizeArray(arr, &base));
307 } else {
308 Py_INCREF(py_ref);
309 base = py_ref;
310 }
311 RETURN_NOT_OK(SetNdarrayBase(np_arr, base));
312
313 // Do not allow Arrow data to be mutated
314 PyArray_CLEARFLAGS(np_arr, NPY_ARRAY_WRITEABLE);
315 *out = result;
316 return Status::OK();
317 }
318
319 class PandasWriter {
320 public:
321 enum type {
322 OBJECT,
323 UINT8,
324 INT8,
325 UINT16,
326 INT16,
327 UINT32,
328 INT32,
329 UINT64,
330 INT64,
331 HALF_FLOAT,
332 FLOAT,
333 DOUBLE,
334 BOOL,
335 DATETIME_DAY,
336 DATETIME_SECOND,
337 DATETIME_MILLI,
338 DATETIME_MICRO,
339 DATETIME_NANO,
340 DATETIME_NANO_TZ,
341 TIMEDELTA_SECOND,
342 TIMEDELTA_MILLI,
343 TIMEDELTA_MICRO,
344 TIMEDELTA_NANO,
345 CATEGORICAL,
346 EXTENSION
347 };
348
349 PandasWriter(const PandasOptions& options, int64_t num_rows, int num_columns)
350 : options_(options), num_rows_(num_rows), num_columns_(num_columns) {}
351 virtual ~PandasWriter() {}
352
353 void SetBlockData(PyObject* arr) {
354 block_arr_.reset(arr);
355 block_data_ =
356 reinterpret_cast<uint8_t*>(PyArray_DATA(reinterpret_cast<PyArrayObject*>(arr)));
357 }
358
359 /// \brief Either copy or wrap single array to create pandas-compatible array
360 /// for Series or DataFrame. num_columns_ can only be 1. Will try to zero
361 /// copy if possible (or error if not possible and zero_copy_only=True)
362 virtual Status TransferSingle(std::shared_ptr<ChunkedArray> data, PyObject* py_ref) = 0;
363
364 /// \brief Copy ChunkedArray into a multi-column block
365 virtual Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) = 0;
366
367 Status EnsurePlacementAllocated() {
368 std::lock_guard<std::mutex> guard(allocation_lock_);
369 if (placement_data_ != nullptr) {
370 return Status::OK();
371 }
372 PyAcquireGIL lock;
373
374 npy_intp placement_dims[1] = {num_columns_};
375 PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64);
376 RETURN_IF_PYERROR();
377 placement_arr_.reset(placement_arr);
378 placement_data_ = reinterpret_cast<int64_t*>(
379 PyArray_DATA(reinterpret_cast<PyArrayObject*>(placement_arr)));
380 return Status::OK();
381 }
382
383 Status EnsureAllocated() {
384 std::lock_guard<std::mutex> guard(allocation_lock_);
385 if (block_data_ != nullptr) {
386 return Status::OK();
387 }
388 RETURN_NOT_OK(Allocate());
389 return Status::OK();
390 }
391
392 virtual bool CanZeroCopy(const ChunkedArray& data) const { return false; }
393
394 virtual Status Write(std::shared_ptr<ChunkedArray> data, int64_t abs_placement,
395 int64_t rel_placement) {
396 RETURN_NOT_OK(EnsurePlacementAllocated());
397 if (num_columns_ == 1 && options_.allow_zero_copy_blocks) {
398 RETURN_NOT_OK(TransferSingle(data, /*py_ref=*/nullptr));
399 } else {
400 RETURN_NOT_OK(
401 CheckNoZeroCopy("Cannot do zero copy conversion into "
402 "multi-column DataFrame block"));
403 RETURN_NOT_OK(EnsureAllocated());
404 RETURN_NOT_OK(CopyInto(data, rel_placement));
405 }
406 placement_data_[rel_placement] = abs_placement;
407 return Status::OK();
408 }
409
410 virtual Status GetDataFrameResult(PyObject** out) {
411 PyObject* result = PyDict_New();
412 RETURN_IF_PYERROR();
413
414 PyObject* block;
415 RETURN_NOT_OK(GetResultBlock(&block));
416
417 PyDict_SetItemString(result, "block", block);
418 PyDict_SetItemString(result, "placement", placement_arr_.obj());
419
420 RETURN_NOT_OK(AddResultMetadata(result));
421 *out = result;
422 return Status::OK();
423 }
424
425 // Caller steals the reference to this object
426 virtual Status GetSeriesResult(PyObject** out) {
427 RETURN_NOT_OK(MakeBlock1D());
428 // Caller owns the object now
429 *out = block_arr_.detach();
430 return Status::OK();
431 }
432
433 protected:
434 virtual Status AddResultMetadata(PyObject* result) { return Status::OK(); }
435
436 Status MakeBlock1D() {
437 // For Series or for certain DataFrame block types, we need to shape to a
438 // 1D array when there is only one column
439 PyAcquireGIL lock;
440
441 DCHECK_EQ(1, num_columns_);
442
443 npy_intp new_dims[1] = {static_cast<npy_intp>(num_rows_)};
444 PyArray_Dims dims;
445 dims.ptr = new_dims;
446 dims.len = 1;
447
448 PyObject* reshaped = PyArray_Newshape(
449 reinterpret_cast<PyArrayObject*>(block_arr_.obj()), &dims, NPY_ANYORDER);
450 RETURN_IF_PYERROR();
451
452 // ARROW-8801: Here a PyArrayObject is created that is not being managed by
453 // any OwnedRef object. This object is then put in the resulting object
454 // with PyDict_SetItemString, which increments the reference count, so a
455 // memory leak ensues. There are several ways to fix the memory leak but a
456 // simple one is to put the reshaped 1D block array in this OwnedRefNoGIL
457 // so it will be correctly decref'd when this class is destructed.
458 block_arr_.reset(reshaped);
459 return Status::OK();
460 }
461
462 virtual Status GetResultBlock(PyObject** out) {
463 *out = block_arr_.obj();
464 return Status::OK();
465 }
466
467 Status CheckNoZeroCopy(const std::string& message) {
468 if (options_.zero_copy_only) {
469 return Status::Invalid(message);
470 }
471 return Status::OK();
472 }
473
474 Status CheckNotZeroCopyOnly(const ChunkedArray& data) {
475 if (options_.zero_copy_only) {
476 return Status::Invalid("Needed to copy ", data.num_chunks(), " chunks with ",
477 data.null_count(), " nulls, but zero_copy_only was True");
478 }
479 return Status::OK();
480 }
481
482 virtual Status Allocate() {
483 return Status::NotImplemented("Override Allocate in subclasses");
484 }
485
486 Status AllocateNDArray(int npy_type, int ndim = 2) {
487 PyAcquireGIL lock;
488
489 PyObject* block_arr;
490 npy_intp block_dims[2] = {0, 0};
491
492 if (ndim == 2) {
493 block_dims[0] = num_columns_;
494 block_dims[1] = num_rows_;
495 } else {
496 block_dims[0] = num_rows_;
497 }
498 PyArray_Descr* descr = internal::GetSafeNumPyDtype(npy_type);
499 if (PyDataType_REFCHK(descr)) {
500 // ARROW-6876: if the array has refcounted items, let Numpy
501 // own the array memory so as to decref elements on array destruction
502 block_arr = PyArray_SimpleNewFromDescr(ndim, block_dims, descr);
503 RETURN_IF_PYERROR();
504 } else {
505 RETURN_NOT_OK(
506 PyArray_NewFromPool(ndim, block_dims, descr, options_.pool, &block_arr));
507 }
508
509 SetBlockData(block_arr);
510 return Status::OK();
511 }
512
513 void SetDatetimeUnit(NPY_DATETIMEUNIT unit) {
514 PyAcquireGIL lock;
515 auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(
516 PyArray_DESCR(reinterpret_cast<PyArrayObject*>(block_arr_.obj()))->c_metadata);
517 date_dtype->meta.base = unit;
518 }
519
520 PandasOptions options_;
521
522 std::mutex allocation_lock_;
523
524 int64_t num_rows_;
525 int num_columns_;
526
527 OwnedRefNoGIL block_arr_;
528 uint8_t* block_data_ = nullptr;
529
530 // ndarray<int32>
531 OwnedRefNoGIL placement_arr_;
532 int64_t* placement_data_ = nullptr;
533
534 private:
535 ARROW_DISALLOW_COPY_AND_ASSIGN(PandasWriter);
536 };
537
538 template <typename InType, typename OutType>
539 inline void ConvertIntegerWithNulls(const PandasOptions& options,
540 const ChunkedArray& data, OutType* out_values) {
541 for (int c = 0; c < data.num_chunks(); c++) {
542 const auto& arr = *data.chunk(c);
543 const InType* in_values = GetPrimitiveValues<InType>(arr);
544 // Upcast to double, set NaN as appropriate
545
546 for (int i = 0; i < arr.length(); ++i) {
547 *out_values++ =
548 arr.IsNull(i) ? static_cast<OutType>(NAN) : static_cast<OutType>(in_values[i]);
549 }
550 }
551 }
552
553 template <typename T>
554 inline void ConvertIntegerNoNullsSameType(const PandasOptions& options,
555 const ChunkedArray& data, T* out_values) {
556 for (int c = 0; c < data.num_chunks(); c++) {
557 const auto& arr = *data.chunk(c);
558 if (arr.length() > 0) {
559 const T* in_values = GetPrimitiveValues<T>(arr);
560 memcpy(out_values, in_values, sizeof(T) * arr.length());
561 out_values += arr.length();
562 }
563 }
564 }
565
566 template <typename InType, typename OutType>
567 inline void ConvertIntegerNoNullsCast(const PandasOptions& options,
568 const ChunkedArray& data, OutType* out_values) {
569 for (int c = 0; c < data.num_chunks(); c++) {
570 const auto& arr = *data.chunk(c);
571 const InType* in_values = GetPrimitiveValues<InType>(arr);
572 for (int64_t i = 0; i < arr.length(); ++i) {
573 *out_values = in_values[i];
574 }
575 }
576 }
577
578 template <typename T, typename Enable = void>
579 struct MemoizationTraits {
580 using Scalar = typename T::c_type;
581 };
582
583 template <typename T>
584 struct MemoizationTraits<T, enable_if_has_string_view<T>> {
585 // For binary, we memoize string_view as a scalar value to avoid having to
586 // unnecessarily copy the memory into the memo table data structure
587 using Scalar = util::string_view;
588 };
589
590 // Generic Array -> PyObject** converter that handles object deduplication, if
591 // requested
592 template <typename Type, typename WrapFunction>
593 inline Status ConvertAsPyObjects(const PandasOptions& options, const ChunkedArray& data,
594 WrapFunction&& wrap_func, PyObject** out_values) {
595 using ArrayType = typename TypeTraits<Type>::ArrayType;
596 using Scalar = typename MemoizationTraits<Type>::Scalar;
597
598 ::arrow::internal::ScalarMemoTable<Scalar> memo_table(options.pool);
599 std::vector<PyObject*> unique_values;
600 int32_t memo_size = 0;
601
602 auto WrapMemoized = [&](const Scalar& value, PyObject** out_values) {
603 int32_t memo_index;
604 RETURN_NOT_OK(memo_table.GetOrInsert(value, &memo_index));
605 if (memo_index == memo_size) {
606 // New entry
607 RETURN_NOT_OK(wrap_func(value, out_values));
608 unique_values.push_back(*out_values);
609 ++memo_size;
610 } else {
611 // Duplicate entry
612 Py_INCREF(unique_values[memo_index]);
613 *out_values = unique_values[memo_index];
614 }
615 return Status::OK();
616 };
617
618 auto WrapUnmemoized = [&](const Scalar& value, PyObject** out_values) {
619 return wrap_func(value, out_values);
620 };
621
622 for (int c = 0; c < data.num_chunks(); c++) {
623 const auto& arr = arrow::internal::checked_cast<const ArrayType&>(*data.chunk(c));
624 if (options.deduplicate_objects) {
625 RETURN_NOT_OK(internal::WriteArrayObjects(arr, WrapMemoized, out_values));
626 } else {
627 RETURN_NOT_OK(internal::WriteArrayObjects(arr, WrapUnmemoized, out_values));
628 }
629 out_values += arr.length();
630 }
631 return Status::OK();
632 }
633
634 Status ConvertStruct(PandasOptions options, const ChunkedArray& data,
635 PyObject** out_values) {
636 if (data.num_chunks() == 0) {
637 return Status::OK();
638 }
639 // ChunkedArray has at least one chunk
640 auto arr = checked_cast<const StructArray*>(data.chunk(0).get());
641 // Use it to cache the struct type and number of fields for all chunks
642 int32_t num_fields = arr->num_fields();
643 auto array_type = arr->type();
644 std::vector<OwnedRef> fields_data(num_fields * data.num_chunks());
645 OwnedRef dict_item;
646
647 // See notes in MakeInnerOptions.
648 options = MakeInnerOptions(std::move(options));
649 // Don't blindly convert because timestamps in lists are handled differently.
650 options.timestamp_as_object = true;
651
652 for (int c = 0; c < data.num_chunks(); c++) {
653 auto fields_data_offset = c * num_fields;
654 auto arr = checked_cast<const StructArray*>(data.chunk(c).get());
655 // Convert the struct arrays first
656 for (int32_t i = 0; i < num_fields; i++) {
657 const auto field = arr->field(static_cast<int>(i));
658 RETURN_NOT_OK(ConvertArrayToPandas(options, field, nullptr,
659 fields_data[i + fields_data_offset].ref()));
660 DCHECK(PyArray_Check(fields_data[i + fields_data_offset].obj()));
661 }
662
663 // Construct a dictionary for each row
664 const bool has_nulls = data.null_count() > 0;
665 for (int64_t i = 0; i < arr->length(); ++i) {
666 if (has_nulls && arr->IsNull(i)) {
667 Py_INCREF(Py_None);
668 *out_values = Py_None;
669 } else {
670 // Build the new dict object for the row
671 dict_item.reset(PyDict_New());
672 RETURN_IF_PYERROR();
673 for (int32_t field_idx = 0; field_idx < num_fields; ++field_idx) {
674 OwnedRef field_value;
675 auto name = array_type->field(static_cast<int>(field_idx))->name();
676 if (!arr->field(static_cast<int>(field_idx))->IsNull(i)) {
677 // Value exists in child array, obtain it
678 auto array = reinterpret_cast<PyArrayObject*>(
679 fields_data[field_idx + fields_data_offset].obj());
680 auto ptr = reinterpret_cast<const char*>(PyArray_GETPTR1(array, i));
681 field_value.reset(PyArray_GETITEM(array, ptr));
682 RETURN_IF_PYERROR();
683 } else {
684 // Translate the Null to a None
685 Py_INCREF(Py_None);
686 field_value.reset(Py_None);
687 }
688 // PyDict_SetItemString increments reference count
689 auto setitem_result =
690 PyDict_SetItemString(dict_item.obj(), name.c_str(), field_value.obj());
691 RETURN_IF_PYERROR();
692 DCHECK_EQ(setitem_result, 0);
693 }
694 *out_values = dict_item.obj();
695 // Grant ownership to the resulting array
696 Py_INCREF(*out_values);
697 }
698 ++out_values;
699 }
700 }
701 return Status::OK();
702 }
703
704 Status DecodeDictionaries(MemoryPool* pool, const std::shared_ptr<DataType>& dense_type,
705 ArrayVector* arrays) {
706 compute::ExecContext ctx(pool);
707 compute::CastOptions options;
708 for (size_t i = 0; i < arrays->size(); ++i) {
709 ARROW_ASSIGN_OR_RAISE((*arrays)[i],
710 compute::Cast(*(*arrays)[i], dense_type, options, &ctx));
711 }
712 return Status::OK();
713 }
714
715 Status DecodeDictionaries(MemoryPool* pool, const std::shared_ptr<DataType>& dense_type,
716 std::shared_ptr<ChunkedArray>* array) {
717 auto chunks = (*array)->chunks();
718 RETURN_NOT_OK(DecodeDictionaries(pool, dense_type, &chunks));
719 *array = std::make_shared<ChunkedArray>(std::move(chunks), dense_type);
720 return Status::OK();
721 }
722
723 template <typename ListArrayT>
724 Status ConvertListsLike(PandasOptions options, const ChunkedArray& data,
725 PyObject** out_values) {
726 // Get column of underlying value arrays
727 ArrayVector value_arrays;
728 for (int c = 0; c < data.num_chunks(); c++) {
729 const auto& arr = checked_cast<const ListArrayT&>(*data.chunk(c));
730 value_arrays.emplace_back(arr.values());
731 }
732 using ListArrayType = typename ListArrayT::TypeClass;
733 const auto& list_type = checked_cast<const ListArrayType&>(*data.type());
734 auto value_type = list_type.value_type();
735
736 auto flat_column = std::make_shared<ChunkedArray>(value_arrays, value_type);
737
738 options = MakeInnerOptions(std::move(options));
739
740 OwnedRefNoGIL owned_numpy_array;
741 RETURN_NOT_OK(ConvertChunkedArrayToPandas(options, flat_column, nullptr,
742 owned_numpy_array.ref()));
743
744 PyObject* numpy_array = owned_numpy_array.obj();
745 DCHECK(PyArray_Check(numpy_array));
746
747 int64_t chunk_offset = 0;
748 for (int c = 0; c < data.num_chunks(); c++) {
749 const auto& arr = checked_cast<const ListArrayT&>(*data.chunk(c));
750
751 const bool has_nulls = data.null_count() > 0;
752 for (int64_t i = 0; i < arr.length(); ++i) {
753 if (has_nulls && arr.IsNull(i)) {
754 Py_INCREF(Py_None);
755 *out_values = Py_None;
756 } else {
757 OwnedRef start(PyLong_FromLongLong(arr.value_offset(i) + chunk_offset));
758 OwnedRef end(PyLong_FromLongLong(arr.value_offset(i + 1) + chunk_offset));
759 OwnedRef slice(PySlice_New(start.obj(), end.obj(), nullptr));
760
761 if (ARROW_PREDICT_FALSE(slice.obj() == nullptr)) {
762 // Fall out of loop, will return from RETURN_IF_PYERROR
763 break;
764 }
765 *out_values = PyObject_GetItem(numpy_array, slice.obj());
766
767 if (*out_values == nullptr) {
768 // Fall out of loop, will return from RETURN_IF_PYERROR
769 break;
770 }
771 }
772 ++out_values;
773 }
774 RETURN_IF_PYERROR();
775
776 chunk_offset += arr.values()->length();
777 }
778
779 return Status::OK();
780 }
781
782 Status ConvertMap(PandasOptions options, const ChunkedArray& data,
783 PyObject** out_values) {
784 // Get columns of underlying key/item arrays
785 std::vector<std::shared_ptr<Array>> key_arrays;
786 std::vector<std::shared_ptr<Array>> item_arrays;
787 for (int c = 0; c < data.num_chunks(); ++c) {
788 const auto& map_arr = checked_cast<const MapArray&>(*data.chunk(c));
789 key_arrays.emplace_back(map_arr.keys());
790 item_arrays.emplace_back(map_arr.items());
791 }
792
793 const auto& map_type = checked_cast<const MapType&>(*data.type());
794 auto key_type = map_type.key_type();
795 auto item_type = map_type.item_type();
796
797 // ARROW-6899: Convert dictionary-encoded children to dense instead of
798 // failing below. A more efficient conversion than this could be done later
799 if (key_type->id() == Type::DICTIONARY) {
800 auto dense_type = checked_cast<const DictionaryType&>(*key_type).value_type();
801 RETURN_NOT_OK(DecodeDictionaries(options.pool, dense_type, &key_arrays));
802 key_type = dense_type;
803 }
804 if (item_type->id() == Type::DICTIONARY) {
805 auto dense_type = checked_cast<const DictionaryType&>(*item_type).value_type();
806 RETURN_NOT_OK(DecodeDictionaries(options.pool, dense_type, &item_arrays));
807 item_type = dense_type;
808 }
809
810 // See notes in MakeInnerOptions.
811 options = MakeInnerOptions(std::move(options));
812 // Don't blindly convert because timestamps in lists are handled differently.
813 options.timestamp_as_object = true;
814
815 auto flat_keys = std::make_shared<ChunkedArray>(key_arrays, key_type);
816 auto flat_items = std::make_shared<ChunkedArray>(item_arrays, item_type);
817 OwnedRef list_item;
818 OwnedRef key_value;
819 OwnedRef item_value;
820 OwnedRefNoGIL owned_numpy_keys;
821 RETURN_NOT_OK(
822 ConvertChunkedArrayToPandas(options, flat_keys, nullptr, owned_numpy_keys.ref()));
823 OwnedRefNoGIL owned_numpy_items;
824 RETURN_NOT_OK(
825 ConvertChunkedArrayToPandas(options, flat_items, nullptr, owned_numpy_items.ref()));
826 PyArrayObject* py_keys = reinterpret_cast<PyArrayObject*>(owned_numpy_keys.obj());
827 PyArrayObject* py_items = reinterpret_cast<PyArrayObject*>(owned_numpy_items.obj());
828
829 int64_t chunk_offset = 0;
830 for (int c = 0; c < data.num_chunks(); ++c) {
831 const auto& arr = checked_cast<const MapArray&>(*data.chunk(c));
832 const bool has_nulls = data.null_count() > 0;
833
834 // Make a list of key/item pairs for each row in array
835 for (int64_t i = 0; i < arr.length(); ++i) {
836 if (has_nulls && arr.IsNull(i)) {
837 Py_INCREF(Py_None);
838 *out_values = Py_None;
839 } else {
840 int64_t entry_offset = arr.value_offset(i);
841 int64_t num_maps = arr.value_offset(i + 1) - entry_offset;
842
843 // Build the new list object for the row of maps
844 list_item.reset(PyList_New(num_maps));
845 RETURN_IF_PYERROR();
846
847 // Add each key/item pair in the row
848 for (int64_t j = 0; j < num_maps; ++j) {
849 // Get key value, key is non-nullable for a valid row
850 auto ptr_key = reinterpret_cast<const char*>(
851 PyArray_GETPTR1(py_keys, chunk_offset + entry_offset + j));
852 key_value.reset(PyArray_GETITEM(py_keys, ptr_key));
853 RETURN_IF_PYERROR();
854
855 if (item_arrays[c]->IsNull(entry_offset + j)) {
856 // Translate the Null to a None
857 Py_INCREF(Py_None);
858 item_value.reset(Py_None);
859 } else {
860 // Get valid value from item array
861 auto ptr_item = reinterpret_cast<const char*>(
862 PyArray_GETPTR1(py_items, chunk_offset + entry_offset + j));
863 item_value.reset(PyArray_GETITEM(py_items, ptr_item));
864 RETURN_IF_PYERROR();
865 }
866
867 // Add the key/item pair to the list for the row
868 PyList_SET_ITEM(list_item.obj(), j,
869 PyTuple_Pack(2, key_value.obj(), item_value.obj()));
870 RETURN_IF_PYERROR();
871 }
872
873 // Pass ownership to the resulting array
874 *out_values = list_item.detach();
875 }
876 ++out_values;
877 }
878 RETURN_IF_PYERROR();
879
880 chunk_offset += arr.values()->length();
881 }
882
883 return Status::OK();
884 }
885
886 template <typename InType, typename OutType>
887 inline void ConvertNumericNullable(const ChunkedArray& data, InType na_value,
888 OutType* out_values) {
889 for (int c = 0; c < data.num_chunks(); c++) {
890 const auto& arr = *data.chunk(c);
891 const InType* in_values = GetPrimitiveValues<InType>(arr);
892
893 if (arr.null_count() > 0) {
894 for (int64_t i = 0; i < arr.length(); ++i) {
895 *out_values++ = arr.IsNull(i) ? na_value : in_values[i];
896 }
897 } else {
898 memcpy(out_values, in_values, sizeof(InType) * arr.length());
899 out_values += arr.length();
900 }
901 }
902 }
903
904 template <typename InType, typename OutType>
905 inline void ConvertNumericNullableCast(const ChunkedArray& data, InType na_value,
906 OutType* out_values) {
907 for (int c = 0; c < data.num_chunks(); c++) {
908 const auto& arr = *data.chunk(c);
909 const InType* in_values = GetPrimitiveValues<InType>(arr);
910
911 for (int64_t i = 0; i < arr.length(); ++i) {
912 *out_values++ = arr.IsNull(i) ? static_cast<OutType>(na_value)
913 : static_cast<OutType>(in_values[i]);
914 }
915 }
916 }
917
918 template <int NPY_TYPE>
919 class TypedPandasWriter : public PandasWriter {
920 public:
921 using T = typename npy_traits<NPY_TYPE>::value_type;
922
923 using PandasWriter::PandasWriter;
924
925 Status TransferSingle(std::shared_ptr<ChunkedArray> data, PyObject* py_ref) override {
926 if (CanZeroCopy(*data)) {
927 PyObject* wrapped;
928 npy_intp dims[2] = {static_cast<npy_intp>(num_columns_),
929 static_cast<npy_intp>(num_rows_)};
930 RETURN_NOT_OK(
931 MakeNumPyView(data->chunk(0), py_ref, NPY_TYPE, /*ndim=*/2, dims, &wrapped));
932 SetBlockData(wrapped);
933 return Status::OK();
934 } else {
935 RETURN_NOT_OK(CheckNotZeroCopyOnly(*data));
936 RETURN_NOT_OK(EnsureAllocated());
937 return CopyInto(data, /*rel_placement=*/0);
938 }
939 }
940
941 Status CheckTypeExact(const DataType& type, Type::type expected) {
942 if (type.id() != expected) {
943 // TODO(wesm): stringify NumPy / pandas type
944 return Status::NotImplemented("Cannot write Arrow data of type ", type.ToString());
945 }
946 return Status::OK();
947 }
948
949 T* GetBlockColumnStart(int64_t rel_placement) {
950 return reinterpret_cast<T*>(block_data_) + rel_placement * num_rows_;
951 }
952
953 protected:
954 Status Allocate() override { return AllocateNDArray(NPY_TYPE); }
955 };
956
957 struct ObjectWriterVisitor {
958 const PandasOptions& options;
959 const ChunkedArray& data;
960 PyObject** out_values;
961
962 Status Visit(const NullType& type) {
963 for (int c = 0; c < data.num_chunks(); c++) {
964 std::shared_ptr<Array> arr = data.chunk(c);
965
966 for (int64_t i = 0; i < arr->length(); ++i) {
967 // All values are null
968 Py_INCREF(Py_None);
969 *out_values = Py_None;
970 ++out_values;
971 }
972 }
973 return Status::OK();
974 }
975
976 Status Visit(const BooleanType& type) {
977 for (int c = 0; c < data.num_chunks(); c++) {
978 const auto& arr = checked_cast<const BooleanArray&>(*data.chunk(c));
979
980 for (int64_t i = 0; i < arr.length(); ++i) {
981 if (arr.IsNull(i)) {
982 Py_INCREF(Py_None);
983 *out_values++ = Py_None;
984 } else if (arr.Value(i)) {
985 // True
986 Py_INCREF(Py_True);
987 *out_values++ = Py_True;
988 } else {
989 // False
990 Py_INCREF(Py_False);
991 *out_values++ = Py_False;
992 }
993 }
994 }
995 return Status::OK();
996 }
997
998 template <typename Type>
999 enable_if_integer<Type, Status> Visit(const Type& type) {
1000 using T = typename Type::c_type;
1001 auto WrapValue = [](T value, PyObject** out) {
1002 *out = std::is_signed<T>::value ? PyLong_FromLongLong(value)
1003 : PyLong_FromUnsignedLongLong(value);
1004 RETURN_IF_PYERROR();
1005 return Status::OK();
1006 };
1007 return ConvertAsPyObjects<Type>(options, data, WrapValue, out_values);
1008 }
1009
1010 template <typename Type>
1011 enable_if_t<is_base_binary_type<Type>::value || is_fixed_size_binary_type<Type>::value,
1012 Status>
1013 Visit(const Type& type) {
1014 auto WrapValue = [](const util::string_view& view, PyObject** out) {
1015 *out = WrapBytes<Type>::Wrap(view.data(), view.length());
1016 if (*out == nullptr) {
1017 PyErr_Clear();
1018 return Status::UnknownError("Wrapping ", view, " failed");
1019 }
1020 return Status::OK();
1021 };
1022 return ConvertAsPyObjects<Type>(options, data, WrapValue, out_values);
1023 }
1024
1025 template <typename Type>
1026 enable_if_date<Type, Status> Visit(const Type& type) {
1027 auto WrapValue = [](typename Type::c_type value, PyObject** out) {
1028 RETURN_NOT_OK(internal::PyDate_from_int(value, Type::UNIT, out));
1029 RETURN_IF_PYERROR();
1030 return Status::OK();
1031 };
1032 return ConvertAsPyObjects<Type>(options, data, WrapValue, out_values);
1033 }
1034
1035 template <typename Type>
1036 enable_if_time<Type, Status> Visit(const Type& type) {
1037 const TimeUnit::type unit = type.unit();
1038 auto WrapValue = [unit](typename Type::c_type value, PyObject** out) {
1039 RETURN_NOT_OK(internal::PyTime_from_int(value, unit, out));
1040 RETURN_IF_PYERROR();
1041 return Status::OK();
1042 };
1043 return ConvertAsPyObjects<Type>(options, data, WrapValue, out_values);
1044 }
1045
1046 template <typename Type>
1047 enable_if_timestamp<Type, Status> Visit(const Type& type) {
1048 const TimeUnit::type unit = type.unit();
1049 OwnedRef tzinfo;
1050
1051 auto ConvertTimezoneNaive = [&](typename Type::c_type value, PyObject** out) {
1052 RETURN_NOT_OK(internal::PyDateTime_from_int(value, unit, out));
1053 RETURN_IF_PYERROR();
1054 return Status::OK();
1055 };
1056 auto ConvertTimezoneAware = [&](typename Type::c_type value, PyObject** out) {
1057 PyObject* naive_datetime;
1058 RETURN_NOT_OK(ConvertTimezoneNaive(value, &naive_datetime));
1059 // convert the timezone naive datetime object to timezone aware
1060 *out = PyObject_CallMethod(tzinfo.obj(), "fromutc", "O", naive_datetime);
1061 // the timezone naive object is no longer required
1062 Py_DECREF(naive_datetime);
1063 RETURN_IF_PYERROR();
1064 return Status::OK();
1065 };
1066
1067 if (!type.timezone().empty() && !options.ignore_timezone) {
1068 // convert timezone aware
1069 PyObject* tzobj;
1070 ARROW_ASSIGN_OR_RAISE(tzobj, internal::StringToTzinfo(type.timezone()));
1071 tzinfo.reset(tzobj);
1072 RETURN_IF_PYERROR();
1073 RETURN_NOT_OK(
1074 ConvertAsPyObjects<Type>(options, data, ConvertTimezoneAware, out_values));
1075 } else {
1076 // convert timezone naive
1077 RETURN_NOT_OK(
1078 ConvertAsPyObjects<Type>(options, data, ConvertTimezoneNaive, out_values));
1079 }
1080
1081 return Status::OK();
1082 }
1083
1084 template <typename Type>
1085 enable_if_t<std::is_same<Type, MonthDayNanoIntervalType>::value, Status> Visit(
1086 const Type& type) {
1087 OwnedRef args(PyTuple_New(0));
1088 OwnedRef kwargs(PyDict_New());
1089 RETURN_IF_PYERROR();
1090 auto to_date_offset = [&](const MonthDayNanoIntervalType::MonthDayNanos& interval,
1091 PyObject** out) {
1092 DCHECK(internal::BorrowPandasDataOffsetType() != nullptr);
1093 // DateOffset objects do not add nanoseconds component to pd.Timestamp.
1094 // as of Pandas 1.3.3
1095 // (https://github.com/pandas-dev/pandas/issues/43892).
1096 // So convert microseconds and remainder to preserve data
1097 // but give users more expected results.
1098 int64_t microseconds = interval.nanoseconds / 1000;
1099 int64_t nanoseconds;
1100 if (interval.nanoseconds >= 0) {
1101 nanoseconds = interval.nanoseconds % 1000;
1102 } else {
1103 nanoseconds = -((-interval.nanoseconds) % 1000);
1104 }
1105
1106 PyDict_SetItemString(kwargs.obj(), "months", PyLong_FromLong(interval.months));
1107 PyDict_SetItemString(kwargs.obj(), "days", PyLong_FromLong(interval.days));
1108 PyDict_SetItemString(kwargs.obj(), "microseconds",
1109 PyLong_FromLongLong(microseconds));
1110 PyDict_SetItemString(kwargs.obj(), "nanoseconds", PyLong_FromLongLong(nanoseconds));
1111 *out =
1112 PyObject_Call(internal::BorrowPandasDataOffsetType(), args.obj(), kwargs.obj());
1113 RETURN_IF_PYERROR();
1114 return Status::OK();
1115 };
1116 return ConvertAsPyObjects<MonthDayNanoIntervalType>(options, data, to_date_offset,
1117 out_values);
1118 }
1119
1120 Status Visit(const Decimal128Type& type) {
1121 OwnedRef decimal;
1122 OwnedRef Decimal;
1123 RETURN_NOT_OK(internal::ImportModule("decimal", &decimal));
1124 RETURN_NOT_OK(internal::ImportFromModule(decimal.obj(), "Decimal", &Decimal));
1125 PyObject* decimal_constructor = Decimal.obj();
1126
1127 for (int c = 0; c < data.num_chunks(); c++) {
1128 const auto& arr = checked_cast<const arrow::Decimal128Array&>(*data.chunk(c));
1129
1130 for (int64_t i = 0; i < arr.length(); ++i) {
1131 if (arr.IsNull(i)) {
1132 Py_INCREF(Py_None);
1133 *out_values++ = Py_None;
1134 } else {
1135 *out_values++ =
1136 internal::DecimalFromString(decimal_constructor, arr.FormatValue(i));
1137 RETURN_IF_PYERROR();
1138 }
1139 }
1140 }
1141
1142 return Status::OK();
1143 }
1144
1145 Status Visit(const Decimal256Type& type) {
1146 OwnedRef decimal;
1147 OwnedRef Decimal;
1148 RETURN_NOT_OK(internal::ImportModule("decimal", &decimal));
1149 RETURN_NOT_OK(internal::ImportFromModule(decimal.obj(), "Decimal", &Decimal));
1150 PyObject* decimal_constructor = Decimal.obj();
1151
1152 for (int c = 0; c < data.num_chunks(); c++) {
1153 const auto& arr = checked_cast<const arrow::Decimal256Array&>(*data.chunk(c));
1154
1155 for (int64_t i = 0; i < arr.length(); ++i) {
1156 if (arr.IsNull(i)) {
1157 Py_INCREF(Py_None);
1158 *out_values++ = Py_None;
1159 } else {
1160 *out_values++ =
1161 internal::DecimalFromString(decimal_constructor, arr.FormatValue(i));
1162 RETURN_IF_PYERROR();
1163 }
1164 }
1165 }
1166
1167 return Status::OK();
1168 }
1169
1170 template <typename T>
1171 enable_if_t<is_fixed_size_list_type<T>::value || is_var_length_list_type<T>::value,
1172 Status>
1173 Visit(const T& type) {
1174 using ArrayType = typename TypeTraits<T>::ArrayType;
1175 if (!ListTypeSupported(*type.value_type())) {
1176 return Status::NotImplemented(
1177 "Not implemented type for conversion from List to Pandas: ",
1178 type.value_type()->ToString());
1179 }
1180 return ConvertListsLike<ArrayType>(options, data, out_values);
1181 }
1182
1183 Status Visit(const MapType& type) { return ConvertMap(options, data, out_values); }
1184
1185 Status Visit(const StructType& type) {
1186 return ConvertStruct(options, data, out_values);
1187 }
1188
1189 template <typename Type>
1190 enable_if_t<is_floating_type<Type>::value ||
1191 std::is_same<DictionaryType, Type>::value ||
1192 std::is_same<DurationType, Type>::value ||
1193 std::is_same<ExtensionType, Type>::value ||
1194 (std::is_base_of<IntervalType, Type>::value &&
1195 !std::is_same<MonthDayNanoIntervalType, Type>::value) ||
1196 std::is_base_of<UnionType, Type>::value,
1197 Status>
1198 Visit(const Type& type) {
1199 return Status::NotImplemented("No implemented conversion to object dtype: ",
1200 type.ToString());
1201 }
1202 };
1203
1204 class ObjectWriter : public TypedPandasWriter<NPY_OBJECT> {
1205 public:
1206 using TypedPandasWriter<NPY_OBJECT>::TypedPandasWriter;
1207 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1208 PyAcquireGIL lock;
1209 ObjectWriterVisitor visitor{this->options_, *data,
1210 this->GetBlockColumnStart(rel_placement)};
1211 return VisitTypeInline(*data->type(), &visitor);
1212 }
1213 };
1214
1215 static inline bool IsNonNullContiguous(const ChunkedArray& data) {
1216 return data.num_chunks() == 1 && data.null_count() == 0;
1217 }
1218
1219 template <int NPY_TYPE>
1220 class IntWriter : public TypedPandasWriter<NPY_TYPE> {
1221 public:
1222 using ArrowType = typename npy_traits<NPY_TYPE>::TypeClass;
1223 using TypedPandasWriter<NPY_TYPE>::TypedPandasWriter;
1224
1225 bool CanZeroCopy(const ChunkedArray& data) const override {
1226 return IsNonNullContiguous(data);
1227 }
1228
1229 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1230 RETURN_NOT_OK(this->CheckTypeExact(*data->type(), ArrowType::type_id));
1231 ConvertIntegerNoNullsSameType<typename ArrowType::c_type>(
1232 this->options_, *data, this->GetBlockColumnStart(rel_placement));
1233 return Status::OK();
1234 }
1235 };
1236
1237 template <int NPY_TYPE>
1238 class FloatWriter : public TypedPandasWriter<NPY_TYPE> {
1239 public:
1240 using ArrowType = typename npy_traits<NPY_TYPE>::TypeClass;
1241 using TypedPandasWriter<NPY_TYPE>::TypedPandasWriter;
1242 using T = typename ArrowType::c_type;
1243
1244 bool CanZeroCopy(const ChunkedArray& data) const override {
1245 return IsNonNullContiguous(data) && data.type()->id() == ArrowType::type_id;
1246 }
1247
1248 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1249 Type::type in_type = data->type()->id();
1250 auto out_values = this->GetBlockColumnStart(rel_placement);
1251
1252 #define INTEGER_CASE(IN_TYPE) \
1253 ConvertIntegerWithNulls<IN_TYPE, T>(this->options_, *data, out_values); \
1254 break;
1255
1256 switch (in_type) {
1257 case Type::UINT8:
1258 INTEGER_CASE(uint8_t);
1259 case Type::INT8:
1260 INTEGER_CASE(int8_t);
1261 case Type::UINT16:
1262 INTEGER_CASE(uint16_t);
1263 case Type::INT16:
1264 INTEGER_CASE(int16_t);
1265 case Type::UINT32:
1266 INTEGER_CASE(uint32_t);
1267 case Type::INT32:
1268 INTEGER_CASE(int32_t);
1269 case Type::UINT64:
1270 INTEGER_CASE(uint64_t);
1271 case Type::INT64:
1272 INTEGER_CASE(int64_t);
1273 case Type::HALF_FLOAT:
1274 ConvertNumericNullableCast(*data, npy_traits<NPY_TYPE>::na_sentinel, out_values);
1275 case Type::FLOAT:
1276 ConvertNumericNullableCast(*data, npy_traits<NPY_TYPE>::na_sentinel, out_values);
1277 break;
1278 case Type::DOUBLE:
1279 ConvertNumericNullableCast(*data, npy_traits<NPY_TYPE>::na_sentinel, out_values);
1280 break;
1281 default:
1282 return Status::NotImplemented("Cannot write Arrow data of type ",
1283 data->type()->ToString(),
1284 " to a Pandas floating point block");
1285 }
1286
1287 #undef INTEGER_CASE
1288
1289 return Status::OK();
1290 }
1291 };
1292
1293 using UInt8Writer = IntWriter<NPY_UINT8>;
1294 using Int8Writer = IntWriter<NPY_INT8>;
1295 using UInt16Writer = IntWriter<NPY_UINT16>;
1296 using Int16Writer = IntWriter<NPY_INT16>;
1297 using UInt32Writer = IntWriter<NPY_UINT32>;
1298 using Int32Writer = IntWriter<NPY_INT32>;
1299 using UInt64Writer = IntWriter<NPY_UINT64>;
1300 using Int64Writer = IntWriter<NPY_INT64>;
1301 using Float16Writer = FloatWriter<NPY_FLOAT16>;
1302 using Float32Writer = FloatWriter<NPY_FLOAT32>;
1303 using Float64Writer = FloatWriter<NPY_FLOAT64>;
1304
1305 class BoolWriter : public TypedPandasWriter<NPY_BOOL> {
1306 public:
1307 using TypedPandasWriter<NPY_BOOL>::TypedPandasWriter;
1308
1309 Status TransferSingle(std::shared_ptr<ChunkedArray> data, PyObject* py_ref) override {
1310 RETURN_NOT_OK(
1311 CheckNoZeroCopy("Zero copy conversions not possible with "
1312 "boolean types"));
1313 RETURN_NOT_OK(EnsureAllocated());
1314 return CopyInto(data, /*rel_placement=*/0);
1315 }
1316
1317 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1318 RETURN_NOT_OK(this->CheckTypeExact(*data->type(), Type::BOOL));
1319 auto out_values = this->GetBlockColumnStart(rel_placement);
1320 for (int c = 0; c < data->num_chunks(); c++) {
1321 const auto& arr = checked_cast<const BooleanArray&>(*data->chunk(c));
1322 for (int64_t i = 0; i < arr.length(); ++i) {
1323 *out_values++ = static_cast<uint8_t>(arr.Value(i));
1324 }
1325 }
1326 return Status::OK();
1327 }
1328 };
1329
1330 // ----------------------------------------------------------------------
1331 // Date / timestamp types
1332
1333 template <typename T, int64_t SHIFT>
1334 inline void ConvertDatetimeLikeNanos(const ChunkedArray& data, int64_t* out_values) {
1335 for (int c = 0; c < data.num_chunks(); c++) {
1336 const auto& arr = *data.chunk(c);
1337 const T* in_values = GetPrimitiveValues<T>(arr);
1338
1339 for (int64_t i = 0; i < arr.length(); ++i) {
1340 *out_values++ = arr.IsNull(i) ? kPandasTimestampNull
1341 : (static_cast<int64_t>(in_values[i]) * SHIFT);
1342 }
1343 }
1344 }
1345
1346 template <typename T, int SHIFT>
1347 void ConvertDatesShift(const ChunkedArray& data, int64_t* out_values) {
1348 for (int c = 0; c < data.num_chunks(); c++) {
1349 const auto& arr = *data.chunk(c);
1350 const T* in_values = GetPrimitiveValues<T>(arr);
1351 for (int64_t i = 0; i < arr.length(); ++i) {
1352 *out_values++ = arr.IsNull(i) ? kPandasTimestampNull
1353 : static_cast<int64_t>(in_values[i]) / SHIFT;
1354 }
1355 }
1356 }
1357
1358 class DatetimeDayWriter : public TypedPandasWriter<NPY_DATETIME> {
1359 public:
1360 using TypedPandasWriter<NPY_DATETIME>::TypedPandasWriter;
1361
1362 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1363 int64_t* out_values = this->GetBlockColumnStart(rel_placement);
1364 const auto& type = checked_cast<const DateType&>(*data->type());
1365 switch (type.unit()) {
1366 case DateUnit::DAY:
1367 ConvertDatesShift<int32_t, 1LL>(*data, out_values);
1368 break;
1369 case DateUnit::MILLI:
1370 ConvertDatesShift<int64_t, 86400000LL>(*data, out_values);
1371 break;
1372 }
1373 return Status::OK();
1374 }
1375
1376 protected:
1377 Status Allocate() override {
1378 RETURN_NOT_OK(this->AllocateNDArray(NPY_DATETIME));
1379 SetDatetimeUnit(NPY_FR_D);
1380 return Status::OK();
1381 }
1382 };
1383
1384 template <TimeUnit::type UNIT>
1385 class DatetimeWriter : public TypedPandasWriter<NPY_DATETIME> {
1386 public:
1387 using TypedPandasWriter<NPY_DATETIME>::TypedPandasWriter;
1388
1389 bool CanZeroCopy(const ChunkedArray& data) const override {
1390 if (data.type()->id() == Type::TIMESTAMP) {
1391 const auto& type = checked_cast<const TimestampType&>(*data.type());
1392 return IsNonNullContiguous(data) && type.unit() == UNIT;
1393 } else {
1394 return false;
1395 }
1396 }
1397
1398 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1399 const auto& ts_type = checked_cast<const TimestampType&>(*data->type());
1400 DCHECK_EQ(UNIT, ts_type.unit()) << "Should only call instances of this writer "
1401 << "with arrays of the correct unit";
1402 ConvertNumericNullable<int64_t>(*data, kPandasTimestampNull,
1403 this->GetBlockColumnStart(rel_placement));
1404 return Status::OK();
1405 }
1406
1407 protected:
1408 Status Allocate() override {
1409 RETURN_NOT_OK(this->AllocateNDArray(NPY_DATETIME));
1410 SetDatetimeUnit(internal::NumPyFrequency(UNIT));
1411 return Status::OK();
1412 }
1413 };
1414
1415 using DatetimeSecondWriter = DatetimeWriter<TimeUnit::SECOND>;
1416 using DatetimeMilliWriter = DatetimeWriter<TimeUnit::MILLI>;
1417 using DatetimeMicroWriter = DatetimeWriter<TimeUnit::MICRO>;
1418
1419 class DatetimeNanoWriter : public DatetimeWriter<TimeUnit::NANO> {
1420 public:
1421 using DatetimeWriter<TimeUnit::NANO>::DatetimeWriter;
1422
1423 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1424 Type::type type = data->type()->id();
1425 int64_t* out_values = this->GetBlockColumnStart(rel_placement);
1426 compute::ExecContext ctx(options_.pool);
1427 compute::CastOptions options;
1428 if (options_.safe_cast) {
1429 options = compute::CastOptions::Safe();
1430 } else {
1431 options = compute::CastOptions::Unsafe();
1432 }
1433 Datum out;
1434 auto target_type = timestamp(TimeUnit::NANO);
1435
1436 if (type == Type::DATE32) {
1437 // Convert from days since epoch to datetime64[ns]
1438 ConvertDatetimeLikeNanos<int32_t, kNanosecondsInDay>(*data, out_values);
1439 } else if (type == Type::DATE64) {
1440 // Date64Type is millisecond timestamp stored as int64_t
1441 // TODO(wesm): Do we want to make sure to zero out the milliseconds?
1442 ConvertDatetimeLikeNanos<int64_t, 1000000L>(*data, out_values);
1443 } else if (type == Type::TIMESTAMP) {
1444 const auto& ts_type = checked_cast<const TimestampType&>(*data->type());
1445
1446 if (ts_type.unit() == TimeUnit::NANO) {
1447 ConvertNumericNullable<int64_t>(*data, kPandasTimestampNull, out_values);
1448 } else if (ts_type.unit() == TimeUnit::MICRO || ts_type.unit() == TimeUnit::MILLI ||
1449 ts_type.unit() == TimeUnit::SECOND) {
1450 ARROW_ASSIGN_OR_RAISE(out, compute::Cast(data, target_type, options, &ctx));
1451 ConvertNumericNullable<int64_t>(*out.chunked_array(), kPandasTimestampNull,
1452 out_values);
1453 } else {
1454 return Status::NotImplemented("Unsupported time unit");
1455 }
1456 } else {
1457 return Status::NotImplemented("Cannot write Arrow data of type ",
1458 data->type()->ToString(),
1459 " to a Pandas datetime block.");
1460 }
1461 return Status::OK();
1462 }
1463 };
1464
1465 class DatetimeTZWriter : public DatetimeNanoWriter {
1466 public:
1467 DatetimeTZWriter(const PandasOptions& options, const std::string& timezone,
1468 int64_t num_rows)
1469 : DatetimeNanoWriter(options, num_rows, 1), timezone_(timezone) {}
1470
1471 protected:
1472 Status GetResultBlock(PyObject** out) override {
1473 RETURN_NOT_OK(MakeBlock1D());
1474 *out = block_arr_.obj();
1475 return Status::OK();
1476 }
1477
1478 Status AddResultMetadata(PyObject* result) override {
1479 PyObject* py_tz = PyUnicode_FromStringAndSize(
1480 timezone_.c_str(), static_cast<Py_ssize_t>(timezone_.size()));
1481 RETURN_IF_PYERROR();
1482 PyDict_SetItemString(result, "timezone", py_tz);
1483 Py_DECREF(py_tz);
1484 return Status::OK();
1485 }
1486
1487 private:
1488 std::string timezone_;
1489 };
1490
1491 template <TimeUnit::type UNIT>
1492 class TimedeltaWriter : public TypedPandasWriter<NPY_TIMEDELTA> {
1493 public:
1494 using TypedPandasWriter<NPY_TIMEDELTA>::TypedPandasWriter;
1495
1496 Status AllocateTimedelta(int ndim) {
1497 RETURN_NOT_OK(this->AllocateNDArray(NPY_TIMEDELTA, ndim));
1498 SetDatetimeUnit(internal::NumPyFrequency(UNIT));
1499 return Status::OK();
1500 }
1501
1502 bool CanZeroCopy(const ChunkedArray& data) const override {
1503 const auto& type = checked_cast<const DurationType&>(*data.type());
1504 return IsNonNullContiguous(data) && type.unit() == UNIT;
1505 }
1506
1507 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1508 const auto& type = checked_cast<const DurationType&>(*data->type());
1509 DCHECK_EQ(UNIT, type.unit()) << "Should only call instances of this writer "
1510 << "with arrays of the correct unit";
1511 ConvertNumericNullable<int64_t>(*data, kPandasTimestampNull,
1512 this->GetBlockColumnStart(rel_placement));
1513 return Status::OK();
1514 }
1515
1516 protected:
1517 Status Allocate() override { return AllocateTimedelta(2); }
1518 };
1519
1520 using TimedeltaSecondWriter = TimedeltaWriter<TimeUnit::SECOND>;
1521 using TimedeltaMilliWriter = TimedeltaWriter<TimeUnit::MILLI>;
1522 using TimedeltaMicroWriter = TimedeltaWriter<TimeUnit::MICRO>;
1523
1524 class TimedeltaNanoWriter : public TimedeltaWriter<TimeUnit::NANO> {
1525 public:
1526 using TimedeltaWriter<TimeUnit::NANO>::TimedeltaWriter;
1527
1528 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1529 Type::type type = data->type()->id();
1530 int64_t* out_values = this->GetBlockColumnStart(rel_placement);
1531 if (type == Type::DURATION) {
1532 const auto& ts_type = checked_cast<const DurationType&>(*data->type());
1533 if (ts_type.unit() == TimeUnit::NANO) {
1534 ConvertNumericNullable<int64_t>(*data, kPandasTimestampNull, out_values);
1535 } else if (ts_type.unit() == TimeUnit::MICRO) {
1536 ConvertDatetimeLikeNanos<int64_t, 1000L>(*data, out_values);
1537 } else if (ts_type.unit() == TimeUnit::MILLI) {
1538 ConvertDatetimeLikeNanos<int64_t, 1000000L>(*data, out_values);
1539 } else if (ts_type.unit() == TimeUnit::SECOND) {
1540 ConvertDatetimeLikeNanos<int64_t, 1000000000L>(*data, out_values);
1541 } else {
1542 return Status::NotImplemented("Unsupported time unit");
1543 }
1544 } else {
1545 return Status::NotImplemented("Cannot write Arrow data of type ",
1546 data->type()->ToString(),
1547 " to a Pandas timedelta block.");
1548 }
1549 return Status::OK();
1550 }
1551 };
1552
1553 Status MakeZeroLengthArray(const std::shared_ptr<DataType>& type,
1554 std::shared_ptr<Array>* out) {
1555 std::unique_ptr<ArrayBuilder> builder;
1556 RETURN_NOT_OK(MakeBuilder(default_memory_pool(), type, &builder));
1557 RETURN_NOT_OK(builder->Resize(0));
1558 return builder->Finish(out);
1559 }
1560
1561 bool NeedDictionaryUnification(const ChunkedArray& data) {
1562 if (data.num_chunks() < 2) {
1563 return false;
1564 }
1565 const auto& arr_first = checked_cast<const DictionaryArray&>(*data.chunk(0));
1566 for (int c = 1; c < data.num_chunks(); c++) {
1567 const auto& arr = checked_cast<const DictionaryArray&>(*data.chunk(c));
1568 if (!(arr_first.dictionary()->Equals(arr.dictionary()))) {
1569 return true;
1570 }
1571 }
1572 return false;
1573 }
1574
1575 template <typename IndexType>
1576 class CategoricalWriter
1577 : public TypedPandasWriter<arrow_traits<IndexType::type_id>::npy_type> {
1578 public:
1579 using TRAITS = arrow_traits<IndexType::type_id>;
1580 using ArrayType = typename TypeTraits<IndexType>::ArrayType;
1581 using T = typename TRAITS::T;
1582
1583 explicit CategoricalWriter(const PandasOptions& options, int64_t num_rows)
1584 : TypedPandasWriter<TRAITS::npy_type>(options, num_rows, 1),
1585 ordered_(false),
1586 needs_copy_(false) {}
1587
1588 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1589 return Status::NotImplemented("categorical type");
1590 }
1591
1592 Status TransferSingle(std::shared_ptr<ChunkedArray> data, PyObject* py_ref) override {
1593 const auto& dict_type = checked_cast<const DictionaryType&>(*data->type());
1594 std::shared_ptr<Array> dict;
1595 if (data->num_chunks() == 0) {
1596 // no dictionary values => create empty array
1597 RETURN_NOT_OK(this->AllocateNDArray(TRAITS::npy_type, 1));
1598 RETURN_NOT_OK(MakeZeroLengthArray(dict_type.value_type(), &dict));
1599 } else {
1600 DCHECK_EQ(IndexType::type_id, dict_type.index_type()->id());
1601 RETURN_NOT_OK(WriteIndices(*data, &dict));
1602 }
1603
1604 PyObject* pydict;
1605 RETURN_NOT_OK(ConvertArrayToPandas(this->options_, dict, nullptr, &pydict));
1606 dictionary_.reset(pydict);
1607 ordered_ = dict_type.ordered();
1608 return Status::OK();
1609 }
1610
1611 Status Write(std::shared_ptr<ChunkedArray> data, int64_t abs_placement,
1612 int64_t rel_placement) override {
1613 RETURN_NOT_OK(this->EnsurePlacementAllocated());
1614 RETURN_NOT_OK(TransferSingle(data, /*py_ref=*/nullptr));
1615 this->placement_data_[rel_placement] = abs_placement;
1616 return Status::OK();
1617 }
1618
1619 Status GetSeriesResult(PyObject** out) override {
1620 PyAcquireGIL lock;
1621
1622 PyObject* result = PyDict_New();
1623 RETURN_IF_PYERROR();
1624
1625 // Expected single array dictionary layout
1626 PyDict_SetItemString(result, "indices", this->block_arr_.obj());
1627 RETURN_IF_PYERROR();
1628 RETURN_NOT_OK(AddResultMetadata(result));
1629
1630 *out = result;
1631 return Status::OK();
1632 }
1633
1634 protected:
1635 Status AddResultMetadata(PyObject* result) override {
1636 PyDict_SetItemString(result, "dictionary", dictionary_.obj());
1637 PyObject* py_ordered = ordered_ ? Py_True : Py_False;
1638 Py_INCREF(py_ordered);
1639 PyDict_SetItemString(result, "ordered", py_ordered);
1640 return Status::OK();
1641 }
1642
1643 Status WriteIndicesUniform(const ChunkedArray& data) {
1644 RETURN_NOT_OK(this->AllocateNDArray(TRAITS::npy_type, 1));
1645 T* out_values = reinterpret_cast<T*>(this->block_data_);
1646
1647 for (int c = 0; c < data.num_chunks(); c++) {
1648 const auto& arr = checked_cast<const DictionaryArray&>(*data.chunk(c));
1649 const auto& indices = checked_cast<const ArrayType&>(*arr.indices());
1650 auto values = reinterpret_cast<const T*>(indices.raw_values());
1651
1652 RETURN_NOT_OK(CheckIndexBounds(*indices.data(), arr.dictionary()->length()));
1653 // Null is -1 in CategoricalBlock
1654 for (int i = 0; i < arr.length(); ++i) {
1655 if (indices.IsValid(i)) {
1656 *out_values++ = values[i];
1657 } else {
1658 *out_values++ = -1;
1659 }
1660 }
1661 }
1662 return Status::OK();
1663 }
1664
1665 Status WriteIndicesVarying(const ChunkedArray& data, std::shared_ptr<Array>* out_dict) {
1666 // Yield int32 indices to allow for dictionary outgrowing the current index
1667 // type
1668 RETURN_NOT_OK(this->AllocateNDArray(NPY_INT32, 1));
1669 auto out_values = reinterpret_cast<int32_t*>(this->block_data_);
1670
1671 const auto& dict_type = checked_cast<const DictionaryType&>(*data.type());
1672
1673 ARROW_ASSIGN_OR_RAISE(auto unifier, DictionaryUnifier::Make(dict_type.value_type(),
1674 this->options_.pool));
1675 for (int c = 0; c < data.num_chunks(); c++) {
1676 const auto& arr = checked_cast<const DictionaryArray&>(*data.chunk(c));
1677 const auto& indices = checked_cast<const ArrayType&>(*arr.indices());
1678 auto values = reinterpret_cast<const T*>(indices.raw_values());
1679
1680 std::shared_ptr<Buffer> transpose_buffer;
1681 RETURN_NOT_OK(unifier->Unify(*arr.dictionary(), &transpose_buffer));
1682
1683 auto transpose = reinterpret_cast<const int32_t*>(transpose_buffer->data());
1684 int64_t dict_length = arr.dictionary()->length();
1685
1686 RETURN_NOT_OK(CheckIndexBounds(*indices.data(), dict_length));
1687
1688 // Null is -1 in CategoricalBlock
1689 for (int i = 0; i < arr.length(); ++i) {
1690 if (indices.IsValid(i)) {
1691 *out_values++ = transpose[values[i]];
1692 } else {
1693 *out_values++ = -1;
1694 }
1695 }
1696 }
1697
1698 std::shared_ptr<DataType> unused_type;
1699 return unifier->GetResult(&unused_type, out_dict);
1700 }
1701
1702 Status WriteIndices(const ChunkedArray& data, std::shared_ptr<Array>* out_dict) {
1703 DCHECK_GT(data.num_chunks(), 0);
1704
1705 // Sniff the first chunk
1706 const auto& arr_first = checked_cast<const DictionaryArray&>(*data.chunk(0));
1707 const auto indices_first = std::static_pointer_cast<ArrayType>(arr_first.indices());
1708
1709 if (data.num_chunks() == 1 && indices_first->null_count() == 0) {
1710 RETURN_NOT_OK(
1711 CheckIndexBounds(*indices_first->data(), arr_first.dictionary()->length()));
1712
1713 PyObject* wrapped;
1714 npy_intp dims[1] = {static_cast<npy_intp>(this->num_rows_)};
1715 RETURN_NOT_OK(MakeNumPyView(indices_first, /*py_ref=*/nullptr, TRAITS::npy_type,
1716 /*ndim=*/1, dims, &wrapped));
1717 this->SetBlockData(wrapped);
1718 *out_dict = arr_first.dictionary();
1719 } else {
1720 RETURN_NOT_OK(this->CheckNotZeroCopyOnly(data));
1721 if (NeedDictionaryUnification(data)) {
1722 RETURN_NOT_OK(WriteIndicesVarying(data, out_dict));
1723 } else {
1724 RETURN_NOT_OK(WriteIndicesUniform(data));
1725 *out_dict = arr_first.dictionary();
1726 }
1727 }
1728 return Status::OK();
1729 }
1730
1731 OwnedRefNoGIL dictionary_;
1732 bool ordered_;
1733 bool needs_copy_;
1734 };
1735
1736 class ExtensionWriter : public PandasWriter {
1737 public:
1738 using PandasWriter::PandasWriter;
1739
1740 Status Allocate() override {
1741 // no-op
1742 return Status::OK();
1743 }
1744
1745 Status TransferSingle(std::shared_ptr<ChunkedArray> data, PyObject* py_ref) override {
1746 PyAcquireGIL lock;
1747 PyObject* py_array;
1748 py_array = wrap_chunked_array(data);
1749 py_array_.reset(py_array);
1750
1751 return Status::OK();
1752 }
1753
1754 Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
1755 return TransferSingle(data, nullptr);
1756 }
1757
1758 Status GetDataFrameResult(PyObject** out) override {
1759 PyAcquireGIL lock;
1760 PyObject* result = PyDict_New();
1761 RETURN_IF_PYERROR();
1762
1763 PyDict_SetItemString(result, "py_array", py_array_.obj());
1764 PyDict_SetItemString(result, "placement", placement_arr_.obj());
1765 *out = result;
1766 return Status::OK();
1767 }
1768
1769 Status GetSeriesResult(PyObject** out) override {
1770 *out = py_array_.detach();
1771 return Status::OK();
1772 }
1773
1774 protected:
1775 OwnedRefNoGIL py_array_;
1776 };
1777
1778 Status MakeWriter(const PandasOptions& options, PandasWriter::type writer_type,
1779 const DataType& type, int64_t num_rows, int num_columns,
1780 std::shared_ptr<PandasWriter>* writer) {
1781 #define BLOCK_CASE(NAME, TYPE) \
1782 case PandasWriter::NAME: \
1783 *writer = std::make_shared<TYPE>(options, num_rows, num_columns); \
1784 break;
1785
1786 #define CATEGORICAL_CASE(TYPE) \
1787 case TYPE::type_id: \
1788 *writer = std::make_shared<CategoricalWriter<TYPE>>(options, num_rows); \
1789 break;
1790
1791 switch (writer_type) {
1792 case PandasWriter::CATEGORICAL: {
1793 const auto& index_type = *checked_cast<const DictionaryType&>(type).index_type();
1794 switch (index_type.id()) {
1795 CATEGORICAL_CASE(Int8Type);
1796 CATEGORICAL_CASE(Int16Type);
1797 CATEGORICAL_CASE(Int32Type);
1798 CATEGORICAL_CASE(Int64Type);
1799 case Type::UINT8:
1800 case Type::UINT16:
1801 case Type::UINT32:
1802 case Type::UINT64:
1803 return Status::TypeError(
1804 "Converting unsigned dictionary indices to pandas",
1805 " not yet supported, index type: ", index_type.ToString());
1806 default:
1807 // Unreachable
1808 DCHECK(false);
1809 break;
1810 }
1811 } break;
1812 case PandasWriter::EXTENSION:
1813 *writer = std::make_shared<ExtensionWriter>(options, num_rows, num_columns);
1814 break;
1815 BLOCK_CASE(OBJECT, ObjectWriter);
1816 BLOCK_CASE(UINT8, UInt8Writer);
1817 BLOCK_CASE(INT8, Int8Writer);
1818 BLOCK_CASE(UINT16, UInt16Writer);
1819 BLOCK_CASE(INT16, Int16Writer);
1820 BLOCK_CASE(UINT32, UInt32Writer);
1821 BLOCK_CASE(INT32, Int32Writer);
1822 BLOCK_CASE(UINT64, UInt64Writer);
1823 BLOCK_CASE(INT64, Int64Writer);
1824 BLOCK_CASE(HALF_FLOAT, Float16Writer);
1825 BLOCK_CASE(FLOAT, Float32Writer);
1826 BLOCK_CASE(DOUBLE, Float64Writer);
1827 BLOCK_CASE(BOOL, BoolWriter);
1828 BLOCK_CASE(DATETIME_DAY, DatetimeDayWriter);
1829 BLOCK_CASE(DATETIME_SECOND, DatetimeSecondWriter);
1830 BLOCK_CASE(DATETIME_MILLI, DatetimeMilliWriter);
1831 BLOCK_CASE(DATETIME_MICRO, DatetimeMicroWriter);
1832 BLOCK_CASE(DATETIME_NANO, DatetimeNanoWriter);
1833 BLOCK_CASE(TIMEDELTA_SECOND, TimedeltaSecondWriter);
1834 BLOCK_CASE(TIMEDELTA_MILLI, TimedeltaMilliWriter);
1835 BLOCK_CASE(TIMEDELTA_MICRO, TimedeltaMicroWriter);
1836 BLOCK_CASE(TIMEDELTA_NANO, TimedeltaNanoWriter);
1837 case PandasWriter::DATETIME_NANO_TZ: {
1838 const auto& ts_type = checked_cast<const TimestampType&>(type);
1839 *writer = std::make_shared<DatetimeTZWriter>(options, ts_type.timezone(), num_rows);
1840 } break;
1841 default:
1842 return Status::NotImplemented("Unsupported block type");
1843 }
1844
1845 #undef BLOCK_CASE
1846 #undef CATEGORICAL_CASE
1847
1848 return Status::OK();
1849 }
1850
1851 static Status GetPandasWriterType(const ChunkedArray& data, const PandasOptions& options,
1852 PandasWriter::type* output_type) {
1853 #define INTEGER_CASE(NAME) \
1854 *output_type = \
1855 data.null_count() > 0 \
1856 ? options.integer_object_nulls ? PandasWriter::OBJECT : PandasWriter::DOUBLE \
1857 : PandasWriter::NAME; \
1858 break;
1859
1860 switch (data.type()->id()) {
1861 case Type::BOOL:
1862 *output_type = data.null_count() > 0 ? PandasWriter::OBJECT : PandasWriter::BOOL;
1863 break;
1864 case Type::UINT8:
1865 INTEGER_CASE(UINT8);
1866 case Type::INT8:
1867 INTEGER_CASE(INT8);
1868 case Type::UINT16:
1869 INTEGER_CASE(UINT16);
1870 case Type::INT16:
1871 INTEGER_CASE(INT16);
1872 case Type::UINT32:
1873 INTEGER_CASE(UINT32);
1874 case Type::INT32:
1875 INTEGER_CASE(INT32);
1876 case Type::UINT64:
1877 INTEGER_CASE(UINT64);
1878 case Type::INT64:
1879 INTEGER_CASE(INT64);
1880 case Type::HALF_FLOAT:
1881 *output_type = PandasWriter::HALF_FLOAT;
1882 break;
1883 case Type::FLOAT:
1884 *output_type = PandasWriter::FLOAT;
1885 break;
1886 case Type::DOUBLE:
1887 *output_type = PandasWriter::DOUBLE;
1888 break;
1889 case Type::STRING: // fall through
1890 case Type::LARGE_STRING: // fall through
1891 case Type::BINARY: // fall through
1892 case Type::LARGE_BINARY:
1893 case Type::NA: // fall through
1894 case Type::FIXED_SIZE_BINARY: // fall through
1895 case Type::STRUCT: // fall through
1896 case Type::TIME32: // fall through
1897 case Type::TIME64: // fall through
1898 case Type::DECIMAL128: // fall through
1899 case Type::DECIMAL256: // fall through
1900 case Type::INTERVAL_MONTH_DAY_NANO: // fall through
1901 *output_type = PandasWriter::OBJECT;
1902 break;
1903 case Type::DATE32: // fall through
1904 case Type::DATE64:
1905 if (options.date_as_object) {
1906 *output_type = PandasWriter::OBJECT;
1907 } else {
1908 *output_type = options.coerce_temporal_nanoseconds ? PandasWriter::DATETIME_NANO
1909 : PandasWriter::DATETIME_DAY;
1910 }
1911 break;
1912 case Type::TIMESTAMP: {
1913 const auto& ts_type = checked_cast<const TimestampType&>(*data.type());
1914 if (options.timestamp_as_object && ts_type.unit() != TimeUnit::NANO) {
1915 // Nanoseconds are never out of bounds for pandas, so in that case
1916 // we don't convert to object
1917 *output_type = PandasWriter::OBJECT;
1918 } else if (!ts_type.timezone().empty()) {
1919 *output_type = PandasWriter::DATETIME_NANO_TZ;
1920 } else if (options.coerce_temporal_nanoseconds) {
1921 *output_type = PandasWriter::DATETIME_NANO;
1922 } else {
1923 switch (ts_type.unit()) {
1924 case TimeUnit::SECOND:
1925 *output_type = PandasWriter::DATETIME_SECOND;
1926 break;
1927 case TimeUnit::MILLI:
1928 *output_type = PandasWriter::DATETIME_MILLI;
1929 break;
1930 case TimeUnit::MICRO:
1931 *output_type = PandasWriter::DATETIME_MICRO;
1932 break;
1933 case TimeUnit::NANO:
1934 *output_type = PandasWriter::DATETIME_NANO;
1935 break;
1936 }
1937 }
1938 } break;
1939 case Type::DURATION: {
1940 const auto& dur_type = checked_cast<const DurationType&>(*data.type());
1941 if (options.coerce_temporal_nanoseconds) {
1942 *output_type = PandasWriter::TIMEDELTA_NANO;
1943 } else {
1944 switch (dur_type.unit()) {
1945 case TimeUnit::SECOND:
1946 *output_type = PandasWriter::TIMEDELTA_SECOND;
1947 break;
1948 case TimeUnit::MILLI:
1949 *output_type = PandasWriter::TIMEDELTA_MILLI;
1950 break;
1951 case TimeUnit::MICRO:
1952 *output_type = PandasWriter::TIMEDELTA_MICRO;
1953 break;
1954 case TimeUnit::NANO:
1955 *output_type = PandasWriter::TIMEDELTA_NANO;
1956 break;
1957 }
1958 }
1959 } break;
1960 case Type::FIXED_SIZE_LIST:
1961 case Type::LIST:
1962 case Type::LARGE_LIST:
1963 case Type::MAP: {
1964 auto list_type = std::static_pointer_cast<BaseListType>(data.type());
1965 if (!ListTypeSupported(*list_type->value_type())) {
1966 return Status::NotImplemented("Not implemented type for Arrow list to pandas: ",
1967 list_type->value_type()->ToString());
1968 }
1969 *output_type = PandasWriter::OBJECT;
1970 } break;
1971 case Type::DICTIONARY:
1972 *output_type = PandasWriter::CATEGORICAL;
1973 break;
1974 case Type::EXTENSION:
1975 *output_type = PandasWriter::EXTENSION;
1976 break;
1977 default:
1978 return Status::NotImplemented(
1979 "No known equivalent Pandas block for Arrow data of type ",
1980 data.type()->ToString(), " is known.");
1981 }
1982 return Status::OK();
1983 }
1984
1985 // Construct the exact pandas "BlockManager" memory layout
1986 //
1987 // * For each column determine the correct output pandas type
1988 // * Allocate 2D blocks (ncols x nrows) for each distinct data type in output
1989 // * Allocate block placement arrays
1990 // * Write Arrow columns out into each slice of memory; populate block
1991 // * placement arrays as we go
1992 class PandasBlockCreator {
1993 public:
1994 using WriterMap = std::unordered_map<int, std::shared_ptr<PandasWriter>>;
1995
1996 explicit PandasBlockCreator(const PandasOptions& options, FieldVector fields,
1997 ChunkedArrayVector arrays)
1998 : options_(options), fields_(std::move(fields)), arrays_(std::move(arrays)) {
1999 num_columns_ = static_cast<int>(arrays_.size());
2000 if (num_columns_ > 0) {
2001 num_rows_ = arrays_[0]->length();
2002 }
2003 column_block_placement_.resize(num_columns_);
2004 }
2005 virtual ~PandasBlockCreator() = default;
2006
2007 virtual Status Convert(PyObject** out) = 0;
2008
2009 Status AppendBlocks(const WriterMap& blocks, PyObject* list) {
2010 for (const auto& it : blocks) {
2011 PyObject* item;
2012 RETURN_NOT_OK(it.second->GetDataFrameResult(&item));
2013 if (PyList_Append(list, item) < 0) {
2014 RETURN_IF_PYERROR();
2015 }
2016
2017 // ARROW-1017; PyList_Append increments object refcount
2018 Py_DECREF(item);
2019 }
2020 return Status::OK();
2021 }
2022
2023 protected:
2024 PandasOptions options_;
2025
2026 FieldVector fields_;
2027 ChunkedArrayVector arrays_;
2028 int num_columns_;
2029 int64_t num_rows_;
2030
2031 // column num -> relative placement within internal block
2032 std::vector<int> column_block_placement_;
2033 };
2034
2035 class ConsolidatedBlockCreator : public PandasBlockCreator {
2036 public:
2037 using PandasBlockCreator::PandasBlockCreator;
2038
2039 Status Convert(PyObject** out) override {
2040 column_types_.resize(num_columns_);
2041 RETURN_NOT_OK(CreateBlocks());
2042 RETURN_NOT_OK(WriteTableToBlocks());
2043 PyAcquireGIL lock;
2044
2045 PyObject* result = PyList_New(0);
2046 RETURN_IF_PYERROR();
2047
2048 RETURN_NOT_OK(AppendBlocks(blocks_, result));
2049 RETURN_NOT_OK(AppendBlocks(singleton_blocks_, result));
2050
2051 *out = result;
2052 return Status::OK();
2053 }
2054
2055 Status GetBlockType(int column_index, PandasWriter::type* out) {
2056 if (options_.extension_columns.count(fields_[column_index]->name())) {
2057 *out = PandasWriter::EXTENSION;
2058 return Status::OK();
2059 } else {
2060 return GetPandasWriterType(*arrays_[column_index], options_, out);
2061 }
2062 }
2063
2064 Status CreateBlocks() {
2065 for (int i = 0; i < num_columns_; ++i) {
2066 const DataType& type = *arrays_[i]->type();
2067 PandasWriter::type output_type;
2068 RETURN_NOT_OK(GetBlockType(i, &output_type));
2069
2070 int block_placement = 0;
2071 std::shared_ptr<PandasWriter> writer;
2072 if (output_type == PandasWriter::CATEGORICAL ||
2073 output_type == PandasWriter::DATETIME_NANO_TZ ||
2074 output_type == PandasWriter::EXTENSION) {
2075 RETURN_NOT_OK(MakeWriter(options_, output_type, type, num_rows_,
2076 /*num_columns=*/1, &writer));
2077 singleton_blocks_[i] = writer;
2078 } else {
2079 auto it = block_sizes_.find(output_type);
2080 if (it != block_sizes_.end()) {
2081 block_placement = it->second;
2082 // Increment count
2083 ++it->second;
2084 } else {
2085 // Add key to map
2086 block_sizes_[output_type] = 1;
2087 }
2088 }
2089 column_types_[i] = output_type;
2090 column_block_placement_[i] = block_placement;
2091 }
2092
2093 // Create normal non-categorical blocks
2094 for (const auto& it : this->block_sizes_) {
2095 PandasWriter::type output_type = static_cast<PandasWriter::type>(it.first);
2096 std::shared_ptr<PandasWriter> block;
2097 RETURN_NOT_OK(MakeWriter(this->options_, output_type, /*unused*/ *null(), num_rows_,
2098 it.second, &block));
2099 this->blocks_[output_type] = block;
2100 }
2101 return Status::OK();
2102 }
2103
2104 Status GetWriter(int i, std::shared_ptr<PandasWriter>* block) {
2105 PandasWriter::type output_type = this->column_types_[i];
2106 switch (output_type) {
2107 case PandasWriter::CATEGORICAL:
2108 case PandasWriter::DATETIME_NANO_TZ:
2109 case PandasWriter::EXTENSION: {
2110 auto it = this->singleton_blocks_.find(i);
2111 if (it == this->singleton_blocks_.end()) {
2112 return Status::KeyError("No block allocated");
2113 }
2114 *block = it->second;
2115 } break;
2116 default:
2117 auto it = this->blocks_.find(output_type);
2118 if (it == this->blocks_.end()) {
2119 return Status::KeyError("No block allocated");
2120 }
2121 *block = it->second;
2122 break;
2123 }
2124 return Status::OK();
2125 }
2126
2127 Status WriteTableToBlocks() {
2128 auto WriteColumn = [this](int i) {
2129 std::shared_ptr<PandasWriter> block;
2130 RETURN_NOT_OK(this->GetWriter(i, &block));
2131 // ARROW-3789 Use std::move on the array to permit self-destructing
2132 return block->Write(std::move(arrays_[i]), i, this->column_block_placement_[i]);
2133 };
2134
2135 return OptionalParallelFor(options_.use_threads, num_columns_, WriteColumn);
2136 }
2137
2138 private:
2139 // column num -> block type id
2140 std::vector<PandasWriter::type> column_types_;
2141
2142 // block type -> type count
2143 std::unordered_map<int, int> block_sizes_;
2144 std::unordered_map<int, const DataType*> block_types_;
2145
2146 // block type -> block
2147 WriterMap blocks_;
2148
2149 WriterMap singleton_blocks_;
2150 };
2151
2152 /// \brief Create blocks for pandas.DataFrame block manager using one block per
2153 /// column strategy. This permits some zero-copy optimizations as well as the
2154 /// ability for the table to "self-destruct" if selected by the user.
2155 class SplitBlockCreator : public PandasBlockCreator {
2156 public:
2157 using PandasBlockCreator::PandasBlockCreator;
2158
2159 Status GetWriter(int i, std::shared_ptr<PandasWriter>* writer) {
2160 PandasWriter::type output_type = PandasWriter::OBJECT;
2161 const DataType& type = *arrays_[i]->type();
2162 if (options_.extension_columns.count(fields_[i]->name())) {
2163 output_type = PandasWriter::EXTENSION;
2164 } else {
2165 // Null count needed to determine output type
2166 RETURN_NOT_OK(GetPandasWriterType(*arrays_[i], options_, &output_type));
2167 }
2168 return MakeWriter(this->options_, output_type, type, num_rows_, 1, writer);
2169 }
2170
2171 Status Convert(PyObject** out) override {
2172 PyAcquireGIL lock;
2173
2174 PyObject* result = PyList_New(0);
2175 RETURN_IF_PYERROR();
2176
2177 for (int i = 0; i < num_columns_; ++i) {
2178 std::shared_ptr<PandasWriter> writer;
2179 RETURN_NOT_OK(GetWriter(i, &writer));
2180 // ARROW-3789 Use std::move on the array to permit self-destructing
2181 RETURN_NOT_OK(writer->Write(std::move(arrays_[i]), i, /*rel_placement=*/0));
2182
2183 PyObject* item;
2184 RETURN_NOT_OK(writer->GetDataFrameResult(&item));
2185 if (PyList_Append(result, item) < 0) {
2186 RETURN_IF_PYERROR();
2187 }
2188 // PyList_Append increments object refcount
2189 Py_DECREF(item);
2190 }
2191
2192 *out = result;
2193 return Status::OK();
2194 }
2195
2196 private:
2197 std::vector<std::shared_ptr<PandasWriter>> writers_;
2198 };
2199
2200 Status ConvertCategoricals(const PandasOptions& options, ChunkedArrayVector* arrays,
2201 FieldVector* fields) {
2202 std::vector<int> columns_to_encode;
2203
2204 // For Categorical conversions
2205 auto EncodeColumn = [&](int j) {
2206 int i = columns_to_encode[j];
2207 if (options.zero_copy_only) {
2208 return Status::Invalid("Need to dictionary encode a column, but ",
2209 "only zero-copy conversions allowed");
2210 }
2211 compute::ExecContext ctx(options.pool);
2212 ARROW_ASSIGN_OR_RAISE(
2213 Datum out, DictionaryEncode((*arrays)[i],
2214 compute::DictionaryEncodeOptions::Defaults(), &ctx));
2215 (*arrays)[i] = out.chunked_array();
2216 (*fields)[i] = (*fields)[i]->WithType((*arrays)[i]->type());
2217 return Status::OK();
2218 };
2219
2220 if (!options.categorical_columns.empty()) {
2221 for (int i = 0; i < static_cast<int>(arrays->size()); i++) {
2222 if ((*arrays)[i]->type()->id() != Type::DICTIONARY &&
2223 options.categorical_columns.count((*fields)[i]->name())) {
2224 columns_to_encode.push_back(i);
2225 }
2226 }
2227 }
2228 if (options.strings_to_categorical) {
2229 for (int i = 0; i < static_cast<int>(arrays->size()); i++) {
2230 if (is_base_binary_like((*arrays)[i]->type()->id())) {
2231 columns_to_encode.push_back(i);
2232 }
2233 }
2234 }
2235 return OptionalParallelFor(options.use_threads,
2236 static_cast<int>(columns_to_encode.size()), EncodeColumn);
2237 }
2238
2239 } // namespace
2240
2241 Status ConvertArrayToPandas(const PandasOptions& options, std::shared_ptr<Array> arr,
2242 PyObject* py_ref, PyObject** out) {
2243 return ConvertChunkedArrayToPandas(
2244 options, std::make_shared<ChunkedArray>(std::move(arr)), py_ref, out);
2245 }
2246
2247 Status ConvertChunkedArrayToPandas(const PandasOptions& options,
2248 std::shared_ptr<ChunkedArray> arr, PyObject* py_ref,
2249 PyObject** out) {
2250 if (options.decode_dictionaries && arr->type()->id() == Type::DICTIONARY) {
2251 const auto& dense_type =
2252 checked_cast<const DictionaryType&>(*arr->type()).value_type();
2253 RETURN_NOT_OK(DecodeDictionaries(options.pool, dense_type, &arr));
2254 DCHECK_NE(arr->type()->id(), Type::DICTIONARY);
2255
2256 // The original Python DictionaryArray won't own the memory anymore
2257 // as we actually built a new array when we decoded the DictionaryArray
2258 // thus let the final resulting numpy array own the memory through a Capsule
2259 py_ref = nullptr;
2260 }
2261
2262 if (options.strings_to_categorical && is_base_binary_like(arr->type()->id())) {
2263 if (options.zero_copy_only) {
2264 return Status::Invalid("Need to dictionary encode a column, but ",
2265 "only zero-copy conversions allowed");
2266 }
2267 compute::ExecContext ctx(options.pool);
2268 ARROW_ASSIGN_OR_RAISE(
2269 Datum out,
2270 DictionaryEncode(arr, compute::DictionaryEncodeOptions::Defaults(), &ctx));
2271 arr = out.chunked_array();
2272 }
2273
2274 PandasOptions modified_options = options;
2275 modified_options.strings_to_categorical = false;
2276
2277 // ARROW-7596: We permit the hybrid Series/DataFrame code path to do zero copy
2278 // optimizations that we do not allow in the default case when converting
2279 // Table->DataFrame
2280 modified_options.allow_zero_copy_blocks = true;
2281
2282 PandasWriter::type output_type;
2283 RETURN_NOT_OK(GetPandasWriterType(*arr, modified_options, &output_type));
2284 if (options.decode_dictionaries) {
2285 DCHECK_NE(output_type, PandasWriter::CATEGORICAL);
2286 }
2287
2288 std::shared_ptr<PandasWriter> writer;
2289 RETURN_NOT_OK(MakeWriter(modified_options, output_type, *arr->type(), arr->length(),
2290 /*num_columns=*/1, &writer));
2291 RETURN_NOT_OK(writer->TransferSingle(std::move(arr), py_ref));
2292 return writer->GetSeriesResult(out);
2293 }
2294
2295 Status ConvertTableToPandas(const PandasOptions& options, std::shared_ptr<Table> table,
2296 PyObject** out) {
2297 ChunkedArrayVector arrays = table->columns();
2298 FieldVector fields = table->fields();
2299
2300 // ARROW-3789: allow "self-destructing" by releasing references to columns as
2301 // we convert them to pandas
2302 table = nullptr;
2303
2304 RETURN_NOT_OK(ConvertCategoricals(options, &arrays, &fields));
2305
2306 PandasOptions modified_options = options;
2307 modified_options.strings_to_categorical = false;
2308 modified_options.categorical_columns.clear();
2309
2310 if (options.split_blocks) {
2311 modified_options.allow_zero_copy_blocks = true;
2312 SplitBlockCreator helper(modified_options, std::move(fields), std::move(arrays));
2313 return helper.Convert(out);
2314 } else {
2315 ConsolidatedBlockCreator helper(modified_options, std::move(fields),
2316 std::move(arrays));
2317 return helper.Convert(out);
2318 }
2319 }
2320
2321 } // namespace py
2322 } // namespace arrow