1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/python/serialize.h"
19 #include "arrow/python/numpy_interop.h"
28 #include <numpy/arrayobject.h>
29 #include <numpy/arrayscalars.h>
31 #include "arrow/array.h"
32 #include "arrow/array/builder_binary.h"
33 #include "arrow/array/builder_nested.h"
34 #include "arrow/array/builder_primitive.h"
35 #include "arrow/array/builder_union.h"
36 #include "arrow/io/interfaces.h"
37 #include "arrow/io/memory.h"
38 #include "arrow/ipc/util.h"
39 #include "arrow/ipc/writer.h"
40 #include "arrow/record_batch.h"
41 #include "arrow/result.h"
42 #include "arrow/tensor.h"
43 #include "arrow/util/logging.h"
45 #include "arrow/python/common.h"
46 #include "arrow/python/datetime.h"
47 #include "arrow/python/helpers.h"
48 #include "arrow/python/iterators.h"
49 #include "arrow/python/numpy_convert.h"
50 #include "arrow/python/platform.h"
51 #include "arrow/python/pyarrow.h"
53 constexpr int32_t kMaxRecursionDepth
= 100;
57 using internal::checked_cast
;
61 class SequenceBuilder
;
64 Status
Append(PyObject
* context
, PyObject
* elem
, SequenceBuilder
* builder
,
65 int32_t recursion_depth
, SerializedPyObject
* blobs_out
);
67 // A Sequence is a heterogeneous collections of elements. It can contain
68 // scalar Python types, lists, tuples, dictionaries, tensors and sparse tensors.
69 class SequenceBuilder
{
71 explicit SequenceBuilder(MemoryPool
* pool
= default_memory_pool())
73 types_(::arrow::int8(), pool
),
74 offsets_(::arrow::int32(), pool
),
75 type_map_(PythonType::NUM_PYTHON_TYPES
, -1) {
76 auto null_builder
= std::make_shared
<NullBuilder
>(pool
);
77 auto initial_ty
= dense_union({field("0", null())});
78 builder_
.reset(new DenseUnionBuilder(pool
, {null_builder
}, initial_ty
));
81 // Appending a none to the sequence
82 Status
AppendNone() { return builder_
->AppendNull(); }
84 template <typename BuilderType
, typename MakeBuilderFn
>
85 Status
CreateAndUpdate(std::shared_ptr
<BuilderType
>* child_builder
, int8_t tag
,
86 MakeBuilderFn make_builder
) {
87 if (!*child_builder
) {
88 child_builder
->reset(make_builder());
89 std::ostringstream convert
;
90 convert
.imbue(std::locale::classic());
91 convert
<< static_cast<int>(tag
);
92 type_map_
[tag
] = builder_
->AppendChild(*child_builder
, convert
.str());
94 return builder_
->Append(type_map_
[tag
]);
97 template <typename BuilderType
, typename T
>
98 Status
AppendPrimitive(std::shared_ptr
<BuilderType
>* child_builder
, const T val
,
101 CreateAndUpdate(child_builder
, tag
, [this]() { return new BuilderType(pool_
); }));
102 return (*child_builder
)->Append(val
);
105 // Appending a boolean to the sequence
106 Status
AppendBool(const bool data
) {
107 return AppendPrimitive(&bools_
, data
, PythonType::BOOL
);
110 // Appending an int64_t to the sequence
111 Status
AppendInt64(const int64_t data
) {
112 return AppendPrimitive(&ints_
, data
, PythonType::INT
);
115 // Append a list of bytes to the sequence
116 Status
AppendBytes(const uint8_t* data
, int32_t length
) {
117 RETURN_NOT_OK(CreateAndUpdate(&bytes_
, PythonType::BYTES
,
118 [this]() { return new BinaryBuilder(pool_
); }));
119 return bytes_
->Append(data
, length
);
122 // Appending a string to the sequence
123 Status
AppendString(const char* data
, int32_t length
) {
124 RETURN_NOT_OK(CreateAndUpdate(&strings_
, PythonType::STRING
,
125 [this]() { return new StringBuilder(pool_
); }));
126 return strings_
->Append(data
, length
);
129 // Appending a half_float to the sequence
130 Status
AppendHalfFloat(const npy_half data
) {
131 return AppendPrimitive(&half_floats_
, data
, PythonType::HALF_FLOAT
);
134 // Appending a float to the sequence
135 Status
AppendFloat(const float data
) {
136 return AppendPrimitive(&floats_
, data
, PythonType::FLOAT
);
139 // Appending a double to the sequence
140 Status
AppendDouble(const double data
) {
141 return AppendPrimitive(&doubles_
, data
, PythonType::DOUBLE
);
144 // Appending a Date64 timestamp to the sequence
145 Status
AppendDate64(const int64_t timestamp
) {
146 return AppendPrimitive(&date64s_
, timestamp
, PythonType::DATE64
);
149 // Appending a tensor to the sequence
151 // \param tensor_index Index of the tensor in the object.
152 Status
AppendTensor(const int32_t tensor_index
) {
153 RETURN_NOT_OK(CreateAndUpdate(&tensor_indices_
, PythonType::TENSOR
,
154 [this]() { return new Int32Builder(pool_
); }));
155 return tensor_indices_
->Append(tensor_index
);
158 // Appending a sparse coo tensor to the sequence
160 // \param sparse_coo_tensor_index Index of the sparse coo tensor in the object.
161 Status
AppendSparseCOOTensor(const int32_t sparse_coo_tensor_index
) {
162 RETURN_NOT_OK(CreateAndUpdate(&sparse_coo_tensor_indices_
,
163 PythonType::SPARSECOOTENSOR
,
164 [this]() { return new Int32Builder(pool_
); }));
165 return sparse_coo_tensor_indices_
->Append(sparse_coo_tensor_index
);
168 // Appending a sparse csr matrix to the sequence
170 // \param sparse_csr_matrix_index Index of the sparse csr matrix in the object.
171 Status
AppendSparseCSRMatrix(const int32_t sparse_csr_matrix_index
) {
172 RETURN_NOT_OK(CreateAndUpdate(&sparse_csr_matrix_indices_
,
173 PythonType::SPARSECSRMATRIX
,
174 [this]() { return new Int32Builder(pool_
); }));
175 return sparse_csr_matrix_indices_
->Append(sparse_csr_matrix_index
);
178 // Appending a sparse csc matrix to the sequence
180 // \param sparse_csc_matrix_index Index of the sparse csc matrix in the object.
181 Status
AppendSparseCSCMatrix(const int32_t sparse_csc_matrix_index
) {
182 RETURN_NOT_OK(CreateAndUpdate(&sparse_csc_matrix_indices_
,
183 PythonType::SPARSECSCMATRIX
,
184 [this]() { return new Int32Builder(pool_
); }));
185 return sparse_csc_matrix_indices_
->Append(sparse_csc_matrix_index
);
188 // Appending a sparse csf tensor to the sequence
190 // \param sparse_csf_tensor_index Index of the sparse csf tensor in the object.
191 Status
AppendSparseCSFTensor(const int32_t sparse_csf_tensor_index
) {
192 RETURN_NOT_OK(CreateAndUpdate(&sparse_csf_tensor_indices_
,
193 PythonType::SPARSECSFTENSOR
,
194 [this]() { return new Int32Builder(pool_
); }));
195 return sparse_csf_tensor_indices_
->Append(sparse_csf_tensor_index
);
198 // Appending a numpy ndarray to the sequence
200 // \param tensor_index Index of the tensor in the object.
201 Status
AppendNdarray(const int32_t ndarray_index
) {
202 RETURN_NOT_OK(CreateAndUpdate(&ndarray_indices_
, PythonType::NDARRAY
,
203 [this]() { return new Int32Builder(pool_
); }));
204 return ndarray_indices_
->Append(ndarray_index
);
207 // Appending a buffer to the sequence
209 // \param buffer_index Index of the buffer in the object.
210 Status
AppendBuffer(const int32_t buffer_index
) {
211 RETURN_NOT_OK(CreateAndUpdate(&buffer_indices_
, PythonType::BUFFER
,
212 [this]() { return new Int32Builder(pool_
); }));
213 return buffer_indices_
->Append(buffer_index
);
216 Status
AppendSequence(PyObject
* context
, PyObject
* sequence
, int8_t tag
,
217 std::shared_ptr
<ListBuilder
>& target_sequence
,
218 std::unique_ptr
<SequenceBuilder
>& values
, int32_t recursion_depth
,
219 SerializedPyObject
* blobs_out
) {
220 if (recursion_depth
>= kMaxRecursionDepth
) {
221 return Status::NotImplemented(
222 "This object exceeds the maximum recursion depth. It may contain itself "
225 RETURN_NOT_OK(CreateAndUpdate(&target_sequence
, tag
, [this, &values
]() {
226 values
.reset(new SequenceBuilder(pool_
));
227 return new ListBuilder(pool_
, values
->builder());
229 RETURN_NOT_OK(target_sequence
->Append());
230 return internal::VisitIterable(
231 sequence
, [&](PyObject
* obj
, bool* keep_going
/* unused */) {
232 return Append(context
, obj
, values
.get(), recursion_depth
, blobs_out
);
236 Status
AppendList(PyObject
* context
, PyObject
* list
, int32_t recursion_depth
,
237 SerializedPyObject
* blobs_out
) {
238 return AppendSequence(context
, list
, PythonType::LIST
, lists_
, list_values_
,
239 recursion_depth
+ 1, blobs_out
);
242 Status
AppendTuple(PyObject
* context
, PyObject
* tuple
, int32_t recursion_depth
,
243 SerializedPyObject
* blobs_out
) {
244 return AppendSequence(context
, tuple
, PythonType::TUPLE
, tuples_
, tuple_values_
,
245 recursion_depth
+ 1, blobs_out
);
248 Status
AppendSet(PyObject
* context
, PyObject
* set
, int32_t recursion_depth
,
249 SerializedPyObject
* blobs_out
) {
250 return AppendSequence(context
, set
, PythonType::SET
, sets_
, set_values_
,
251 recursion_depth
+ 1, blobs_out
);
254 Status
AppendDict(PyObject
* context
, PyObject
* dict
, int32_t recursion_depth
,
255 SerializedPyObject
* blobs_out
);
257 // Finish building the sequence and return the result.
258 // Input arrays may be nullptr
259 Status
Finish(std::shared_ptr
<Array
>* out
) { return builder_
->Finish(out
); }
261 std::shared_ptr
<DenseUnionBuilder
> builder() { return builder_
; }
267 Int32Builder offsets_
;
269 /// Mapping from PythonType to child index
270 std::vector
<int8_t> type_map_
;
272 std::shared_ptr
<BooleanBuilder
> bools_
;
273 std::shared_ptr
<Int64Builder
> ints_
;
274 std::shared_ptr
<BinaryBuilder
> bytes_
;
275 std::shared_ptr
<StringBuilder
> strings_
;
276 std::shared_ptr
<HalfFloatBuilder
> half_floats_
;
277 std::shared_ptr
<FloatBuilder
> floats_
;
278 std::shared_ptr
<DoubleBuilder
> doubles_
;
279 std::shared_ptr
<Date64Builder
> date64s_
;
281 std::unique_ptr
<SequenceBuilder
> list_values_
;
282 std::shared_ptr
<ListBuilder
> lists_
;
283 std::unique_ptr
<DictBuilder
> dict_values_
;
284 std::shared_ptr
<ListBuilder
> dicts_
;
285 std::unique_ptr
<SequenceBuilder
> tuple_values_
;
286 std::shared_ptr
<ListBuilder
> tuples_
;
287 std::unique_ptr
<SequenceBuilder
> set_values_
;
288 std::shared_ptr
<ListBuilder
> sets_
;
290 std::shared_ptr
<Int32Builder
> tensor_indices_
;
291 std::shared_ptr
<Int32Builder
> sparse_coo_tensor_indices_
;
292 std::shared_ptr
<Int32Builder
> sparse_csr_matrix_indices_
;
293 std::shared_ptr
<Int32Builder
> sparse_csc_matrix_indices_
;
294 std::shared_ptr
<Int32Builder
> sparse_csf_tensor_indices_
;
295 std::shared_ptr
<Int32Builder
> ndarray_indices_
;
296 std::shared_ptr
<Int32Builder
> buffer_indices_
;
298 std::shared_ptr
<DenseUnionBuilder
> builder_
;
301 // Constructing dictionaries of key/value pairs. Sequences of
302 // keys and values are built separately using a pair of
303 // SequenceBuilders. The resulting Arrow representation
304 // can be obtained via the Finish method.
307 explicit DictBuilder(MemoryPool
* pool
= nullptr) : keys_(pool
), vals_(pool
) {
308 builder_
.reset(new StructBuilder(struct_({field("keys", dense_union(FieldVector
{})),
309 field("vals", dense_union(FieldVector
{}))}),
310 pool
, {keys_
.builder(), vals_
.builder()}));
313 // Builder for the keys of the dictionary
314 SequenceBuilder
& keys() { return keys_
; }
315 // Builder for the values of the dictionary
316 SequenceBuilder
& vals() { return vals_
; }
318 // Construct an Arrow StructArray representing the dictionary.
319 // Contains a field "keys" for the keys and "vals" for the values.
320 Status
Finish(std::shared_ptr
<Array
>* out
) { return builder_
->Finish(out
); }
322 std::shared_ptr
<StructBuilder
> builder() { return builder_
; }
325 SequenceBuilder keys_
;
326 SequenceBuilder vals_
;
327 std::shared_ptr
<StructBuilder
> builder_
;
330 Status
SequenceBuilder::AppendDict(PyObject
* context
, PyObject
* dict
,
331 int32_t recursion_depth
,
332 SerializedPyObject
* blobs_out
) {
333 if (recursion_depth
>= kMaxRecursionDepth
) {
334 return Status::NotImplemented(
335 "This object exceeds the maximum recursion depth. It may contain itself "
338 RETURN_NOT_OK(CreateAndUpdate(&dicts_
, PythonType::DICT
, [this]() {
339 dict_values_
.reset(new DictBuilder(pool_
));
340 return new ListBuilder(pool_
, dict_values_
->builder());
342 RETURN_NOT_OK(dicts_
->Append());
346 while (PyDict_Next(dict
, &pos
, &key
, &value
)) {
347 RETURN_NOT_OK(dict_values_
->builder()->Append());
349 Append(context
, key
, &dict_values_
->keys(), recursion_depth
+ 1, blobs_out
));
351 Append(context
, value
, &dict_values_
->vals(), recursion_depth
+ 1, blobs_out
));
354 // This block is used to decrement the reference counts of the results
355 // returned by the serialization callback, which is called in AppendArray,
356 // in DeserializeDict and in Append
357 static PyObject
* py_type
= PyUnicode_FromString("_pytype_");
358 if (PyDict_Contains(dict
, py_type
)) {
359 // If the dictionary contains the key "_pytype_", then the user has to
360 // have registered a callback.
361 if (context
== Py_None
) {
362 return Status::Invalid("No serialization callback set");
369 Status
CallCustomCallback(PyObject
* context
, PyObject
* method_name
, PyObject
* elem
,
371 if (context
== Py_None
) {
373 return Status::SerializationError("error while calling callback on ",
374 internal::PyObject_StdStringRepr(elem
),
375 ": handler not registered");
377 *result
= PyObject_CallMethodObjArgs(context
, method_name
, elem
, NULL
);
378 return CheckPyError();
382 Status
CallSerializeCallback(PyObject
* context
, PyObject
* value
,
383 PyObject
** serialized_object
) {
384 OwnedRef
method_name(PyUnicode_FromString("_serialize_callback"));
385 RETURN_NOT_OK(CallCustomCallback(context
, method_name
.obj(), value
, serialized_object
));
386 if (!PyDict_Check(*serialized_object
)) {
387 return Status::TypeError("serialization callback must return a valid dictionary");
392 Status
CallDeserializeCallback(PyObject
* context
, PyObject
* value
,
393 PyObject
** deserialized_object
) {
394 OwnedRef
method_name(PyUnicode_FromString("_deserialize_callback"));
395 return CallCustomCallback(context
, method_name
.obj(), value
, deserialized_object
);
398 Status
AppendArray(PyObject
* context
, PyArrayObject
* array
, SequenceBuilder
* builder
,
399 int32_t recursion_depth
, SerializedPyObject
* blobs_out
);
401 template <typename NumpyScalarObject
>
402 Status
AppendIntegerScalar(PyObject
* obj
, SequenceBuilder
* builder
) {
403 int64_t value
= reinterpret_cast<NumpyScalarObject
*>(obj
)->obval
;
404 return builder
->AppendInt64(value
);
407 // Append a potentially 64-bit wide unsigned Numpy scalar.
408 // Must check for overflow as we reinterpret it as signed int64.
409 template <typename NumpyScalarObject
>
410 Status
AppendLargeUnsignedScalar(PyObject
* obj
, SequenceBuilder
* builder
) {
411 constexpr uint64_t max_value
= std::numeric_limits
<int64_t>::max();
413 uint64_t value
= reinterpret_cast<NumpyScalarObject
*>(obj
)->obval
;
414 if (value
> max_value
) {
415 return Status::Invalid("cannot serialize Numpy uint64 scalar >= 2**63");
417 return builder
->AppendInt64(static_cast<int64_t>(value
));
420 Status
AppendScalar(PyObject
* obj
, SequenceBuilder
* builder
) {
421 if (PyArray_IsScalar(obj
, Bool
)) {
422 return builder
->AppendBool(reinterpret_cast<PyBoolScalarObject
*>(obj
)->obval
!= 0);
423 } else if (PyArray_IsScalar(obj
, Half
)) {
424 return builder
->AppendHalfFloat(reinterpret_cast<PyHalfScalarObject
*>(obj
)->obval
);
425 } else if (PyArray_IsScalar(obj
, Float
)) {
426 return builder
->AppendFloat(reinterpret_cast<PyFloatScalarObject
*>(obj
)->obval
);
427 } else if (PyArray_IsScalar(obj
, Double
)) {
428 return builder
->AppendDouble(reinterpret_cast<PyDoubleScalarObject
*>(obj
)->obval
);
430 if (PyArray_IsScalar(obj
, Byte
)) {
431 return AppendIntegerScalar
<PyByteScalarObject
>(obj
, builder
);
432 } else if (PyArray_IsScalar(obj
, Short
)) {
433 return AppendIntegerScalar
<PyShortScalarObject
>(obj
, builder
);
434 } else if (PyArray_IsScalar(obj
, Int
)) {
435 return AppendIntegerScalar
<PyIntScalarObject
>(obj
, builder
);
436 } else if (PyArray_IsScalar(obj
, Long
)) {
437 return AppendIntegerScalar
<PyLongScalarObject
>(obj
, builder
);
438 } else if (PyArray_IsScalar(obj
, LongLong
)) {
439 return AppendIntegerScalar
<PyLongLongScalarObject
>(obj
, builder
);
440 } else if (PyArray_IsScalar(obj
, Int64
)) {
441 return AppendIntegerScalar
<PyInt64ScalarObject
>(obj
, builder
);
442 } else if (PyArray_IsScalar(obj
, UByte
)) {
443 return AppendIntegerScalar
<PyUByteScalarObject
>(obj
, builder
);
444 } else if (PyArray_IsScalar(obj
, UShort
)) {
445 return AppendIntegerScalar
<PyUShortScalarObject
>(obj
, builder
);
446 } else if (PyArray_IsScalar(obj
, UInt
)) {
447 return AppendIntegerScalar
<PyUIntScalarObject
>(obj
, builder
);
448 } else if (PyArray_IsScalar(obj
, ULong
)) {
449 return AppendLargeUnsignedScalar
<PyULongScalarObject
>(obj
, builder
);
450 } else if (PyArray_IsScalar(obj
, ULongLong
)) {
451 return AppendLargeUnsignedScalar
<PyULongLongScalarObject
>(obj
, builder
);
452 } else if (PyArray_IsScalar(obj
, UInt64
)) {
453 return AppendLargeUnsignedScalar
<PyUInt64ScalarObject
>(obj
, builder
);
455 return Status::NotImplemented("Numpy scalar type not recognized");
458 Status
Append(PyObject
* context
, PyObject
* elem
, SequenceBuilder
* builder
,
459 int32_t recursion_depth
, SerializedPyObject
* blobs_out
) {
460 // The bool case must precede the int case (PyInt_Check passes for bools)
461 if (PyBool_Check(elem
)) {
462 RETURN_NOT_OK(builder
->AppendBool(elem
== Py_True
));
463 } else if (PyArray_DescrFromScalar(elem
)->type_num
== NPY_HALF
) {
464 npy_half halffloat
= reinterpret_cast<PyHalfScalarObject
*>(elem
)->obval
;
465 RETURN_NOT_OK(builder
->AppendHalfFloat(halffloat
));
466 } else if (PyFloat_Check(elem
)) {
467 RETURN_NOT_OK(builder
->AppendDouble(PyFloat_AS_DOUBLE(elem
)));
468 } else if (PyLong_Check(elem
)) {
470 int64_t data
= PyLong_AsLongLongAndOverflow(elem
, &overflow
);
472 RETURN_NOT_OK(builder
->AppendInt64(data
));
474 // Attempt to serialize the object using the custom callback.
475 PyObject
* serialized_object
;
476 // The reference count of serialized_object will be decremented in SerializeDict
477 RETURN_NOT_OK(CallSerializeCallback(context
, elem
, &serialized_object
));
479 builder
->AppendDict(context
, serialized_object
, recursion_depth
, blobs_out
));
481 } else if (PyBytes_Check(elem
)) {
482 auto data
= reinterpret_cast<uint8_t*>(PyBytes_AS_STRING(elem
));
484 RETURN_NOT_OK(internal::CastSize(PyBytes_GET_SIZE(elem
), &size
));
485 RETURN_NOT_OK(builder
->AppendBytes(data
, size
));
486 } else if (PyUnicode_Check(elem
)) {
487 ARROW_ASSIGN_OR_RAISE(auto view
, PyBytesView::FromUnicode(elem
));
489 RETURN_NOT_OK(internal::CastSize(view
.size
, &size
));
490 RETURN_NOT_OK(builder
->AppendString(view
.bytes
, size
));
491 } else if (PyList_CheckExact(elem
)) {
492 RETURN_NOT_OK(builder
->AppendList(context
, elem
, recursion_depth
, blobs_out
));
493 } else if (PyDict_CheckExact(elem
)) {
494 RETURN_NOT_OK(builder
->AppendDict(context
, elem
, recursion_depth
, blobs_out
));
495 } else if (PyTuple_CheckExact(elem
)) {
496 RETURN_NOT_OK(builder
->AppendTuple(context
, elem
, recursion_depth
, blobs_out
));
497 } else if (PySet_Check(elem
)) {
498 RETURN_NOT_OK(builder
->AppendSet(context
, elem
, recursion_depth
, blobs_out
));
499 } else if (PyArray_IsScalar(elem
, Generic
)) {
500 RETURN_NOT_OK(AppendScalar(elem
, builder
));
501 } else if (PyArray_CheckExact(elem
)) {
502 RETURN_NOT_OK(AppendArray(context
, reinterpret_cast<PyArrayObject
*>(elem
), builder
,
503 recursion_depth
, blobs_out
));
504 } else if (elem
== Py_None
) {
505 RETURN_NOT_OK(builder
->AppendNone());
506 } else if (PyDateTime_Check(elem
)) {
507 PyDateTime_DateTime
* datetime
= reinterpret_cast<PyDateTime_DateTime
*>(elem
);
508 RETURN_NOT_OK(builder
->AppendDate64(internal::PyDateTime_to_us(datetime
)));
509 } else if (is_buffer(elem
)) {
510 RETURN_NOT_OK(builder
->AppendBuffer(static_cast<int32_t>(blobs_out
->buffers
.size())));
511 ARROW_ASSIGN_OR_RAISE(auto buffer
, unwrap_buffer(elem
));
512 blobs_out
->buffers
.push_back(buffer
);
513 } else if (is_tensor(elem
)) {
514 RETURN_NOT_OK(builder
->AppendTensor(static_cast<int32_t>(blobs_out
->tensors
.size())));
515 ARROW_ASSIGN_OR_RAISE(auto tensor
, unwrap_tensor(elem
));
516 blobs_out
->tensors
.push_back(tensor
);
517 } else if (is_sparse_coo_tensor(elem
)) {
518 RETURN_NOT_OK(builder
->AppendSparseCOOTensor(
519 static_cast<int32_t>(blobs_out
->sparse_tensors
.size())));
520 ARROW_ASSIGN_OR_RAISE(auto tensor
, unwrap_sparse_coo_tensor(elem
));
521 blobs_out
->sparse_tensors
.push_back(tensor
);
522 } else if (is_sparse_csr_matrix(elem
)) {
523 RETURN_NOT_OK(builder
->AppendSparseCSRMatrix(
524 static_cast<int32_t>(blobs_out
->sparse_tensors
.size())));
525 ARROW_ASSIGN_OR_RAISE(auto matrix
, unwrap_sparse_csr_matrix(elem
));
526 blobs_out
->sparse_tensors
.push_back(matrix
);
527 } else if (is_sparse_csc_matrix(elem
)) {
528 RETURN_NOT_OK(builder
->AppendSparseCSCMatrix(
529 static_cast<int32_t>(blobs_out
->sparse_tensors
.size())));
530 ARROW_ASSIGN_OR_RAISE(auto matrix
, unwrap_sparse_csc_matrix(elem
));
531 blobs_out
->sparse_tensors
.push_back(matrix
);
532 } else if (is_sparse_csf_tensor(elem
)) {
533 RETURN_NOT_OK(builder
->AppendSparseCSFTensor(
534 static_cast<int32_t>(blobs_out
->sparse_tensors
.size())));
535 ARROW_ASSIGN_OR_RAISE(auto tensor
, unwrap_sparse_csf_tensor(elem
));
536 blobs_out
->sparse_tensors
.push_back(tensor
);
538 // Attempt to serialize the object using the custom callback.
539 PyObject
* serialized_object
;
540 // The reference count of serialized_object will be decremented in SerializeDict
541 RETURN_NOT_OK(CallSerializeCallback(context
, elem
, &serialized_object
));
543 builder
->AppendDict(context
, serialized_object
, recursion_depth
, blobs_out
));
548 Status
AppendArray(PyObject
* context
, PyArrayObject
* array
, SequenceBuilder
* builder
,
549 int32_t recursion_depth
, SerializedPyObject
* blobs_out
) {
550 int dtype
= PyArray_TYPE(array
);
564 builder
->AppendNdarray(static_cast<int32_t>(blobs_out
->ndarrays
.size())));
565 std::shared_ptr
<Tensor
> tensor
;
566 RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(),
567 reinterpret_cast<PyObject
*>(array
), {}, &tensor
));
568 blobs_out
->ndarrays
.push_back(tensor
);
571 PyObject
* serialized_object
;
572 // The reference count of serialized_object will be decremented in SerializeDict
573 RETURN_NOT_OK(CallSerializeCallback(context
, reinterpret_cast<PyObject
*>(array
),
574 &serialized_object
));
575 RETURN_NOT_OK(builder
->AppendDict(context
, serialized_object
, recursion_depth
+ 1,
582 std::shared_ptr
<RecordBatch
> MakeBatch(std::shared_ptr
<Array
> data
) {
583 auto field
= std::make_shared
<Field
>("list", data
->type());
584 auto schema
= ::arrow::schema({field
});
585 return RecordBatch::Make(schema
, data
->length(), {data
});
588 Status
SerializeObject(PyObject
* context
, PyObject
* sequence
, SerializedPyObject
* out
) {
590 SequenceBuilder builder
;
591 RETURN_NOT_OK(internal::VisitIterable(
592 sequence
, [&](PyObject
* obj
, bool* keep_going
/* unused */) {
593 return Append(context
, obj
, &builder
, 0, out
);
595 std::shared_ptr
<Array
> array
;
596 RETURN_NOT_OK(builder
.Finish(&array
));
597 out
->batch
= MakeBatch(array
);
601 Status
SerializeNdarray(std::shared_ptr
<Tensor
> tensor
, SerializedPyObject
* out
) {
602 std::shared_ptr
<Array
> array
;
603 SequenceBuilder builder
;
604 RETURN_NOT_OK(builder
.AppendNdarray(static_cast<int32_t>(out
->ndarrays
.size())));
605 out
->ndarrays
.push_back(tensor
);
606 RETURN_NOT_OK(builder
.Finish(&array
));
607 out
->batch
= MakeBatch(array
);
611 Status
WriteNdarrayHeader(std::shared_ptr
<DataType
> dtype
,
612 const std::vector
<int64_t>& shape
, int64_t tensor_num_bytes
,
613 io::OutputStream
* dst
) {
614 auto empty_tensor
= std::make_shared
<Tensor
>(
615 dtype
, std::make_shared
<Buffer
>(nullptr, tensor_num_bytes
), shape
);
616 SerializedPyObject serialized_tensor
;
617 RETURN_NOT_OK(SerializeNdarray(empty_tensor
, &serialized_tensor
));
618 return serialized_tensor
.WriteTo(dst
);
621 SerializedPyObject::SerializedPyObject()
622 : ipc_options(ipc::IpcWriteOptions::Defaults()) {}
624 Status
SerializedPyObject::WriteTo(io::OutputStream
* dst
) {
625 int32_t num_tensors
= static_cast<int32_t>(this->tensors
.size());
626 int32_t num_sparse_tensors
= static_cast<int32_t>(this->sparse_tensors
.size());
627 int32_t num_ndarrays
= static_cast<int32_t>(this->ndarrays
.size());
628 int32_t num_buffers
= static_cast<int32_t>(this->buffers
.size());
630 dst
->Write(reinterpret_cast<const uint8_t*>(&num_tensors
), sizeof(int32_t)));
632 dst
->Write(reinterpret_cast<const uint8_t*>(&num_sparse_tensors
), sizeof(int32_t)));
634 dst
->Write(reinterpret_cast<const uint8_t*>(&num_ndarrays
), sizeof(int32_t)));
636 dst
->Write(reinterpret_cast<const uint8_t*>(&num_buffers
), sizeof(int32_t)));
638 // Align stream to 8-byte offset
639 RETURN_NOT_OK(ipc::AlignStream(dst
, ipc::kArrowIpcAlignment
));
640 RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch
}, this->ipc_options
, dst
));
642 // Align stream to 64-byte offset so tensor bodies are 64-byte aligned
643 RETURN_NOT_OK(ipc::AlignStream(dst
, ipc::kTensorAlignment
));
645 int32_t metadata_length
;
647 for (const auto& tensor
: this->tensors
) {
648 RETURN_NOT_OK(ipc::WriteTensor(*tensor
, dst
, &metadata_length
, &body_length
));
649 RETURN_NOT_OK(ipc::AlignStream(dst
, ipc::kTensorAlignment
));
652 for (const auto& sparse_tensor
: this->sparse_tensors
) {
654 ipc::WriteSparseTensor(*sparse_tensor
, dst
, &metadata_length
, &body_length
));
655 RETURN_NOT_OK(ipc::AlignStream(dst
, ipc::kTensorAlignment
));
658 for (const auto& tensor
: this->ndarrays
) {
659 RETURN_NOT_OK(ipc::WriteTensor(*tensor
, dst
, &metadata_length
, &body_length
));
660 RETURN_NOT_OK(ipc::AlignStream(dst
, ipc::kTensorAlignment
));
663 for (const auto& buffer
: this->buffers
) {
664 int64_t size
= buffer
->size();
665 RETURN_NOT_OK(dst
->Write(reinterpret_cast<const uint8_t*>(&size
), sizeof(int64_t)));
666 RETURN_NOT_OK(dst
->Write(buffer
->data(), size
));
674 Status
CountSparseTensors(
675 const std::vector
<std::shared_ptr
<SparseTensor
>>& sparse_tensors
, PyObject
** out
) {
676 OwnedRef
num_sparse_tensors(PyDict_New());
683 for (const auto& sparse_tensor
: sparse_tensors
) {
684 switch (sparse_tensor
->format_id()) {
685 case SparseTensorFormat::COO
:
688 case SparseTensorFormat::CSR
:
691 case SparseTensorFormat::CSC
:
694 case SparseTensorFormat::CSF
:
696 ndim_csf
+= sparse_tensor
->ndim();
701 PyDict_SetItemString(num_sparse_tensors
.obj(), "coo", PyLong_FromSize_t(num_coo
));
702 PyDict_SetItemString(num_sparse_tensors
.obj(), "csr", PyLong_FromSize_t(num_csr
));
703 PyDict_SetItemString(num_sparse_tensors
.obj(), "csc", PyLong_FromSize_t(num_csc
));
704 PyDict_SetItemString(num_sparse_tensors
.obj(), "csf", PyLong_FromSize_t(num_csf
));
705 PyDict_SetItemString(num_sparse_tensors
.obj(), "ndim_csf", PyLong_FromSize_t(ndim_csf
));
708 *out
= num_sparse_tensors
.detach();
714 Status
SerializedPyObject::GetComponents(MemoryPool
* memory_pool
, PyObject
** out
) {
717 OwnedRef
result(PyDict_New());
718 PyObject
* buffers
= PyList_New(0);
719 PyObject
* num_sparse_tensors
= nullptr;
721 // TODO(wesm): Not sure how pedantic we need to be about checking the return
722 // values of these functions. There are other places where we do not check
723 // PyDict_SetItem/SetItemString return value, but these failures would be
725 PyDict_SetItemString(result
.obj(), "num_tensors",
726 PyLong_FromSize_t(this->tensors
.size()));
727 RETURN_NOT_OK(CountSparseTensors(this->sparse_tensors
, &num_sparse_tensors
));
728 PyDict_SetItemString(result
.obj(), "num_sparse_tensors", num_sparse_tensors
);
729 PyDict_SetItemString(result
.obj(), "ndim_csf", num_sparse_tensors
);
730 PyDict_SetItemString(result
.obj(), "num_ndarrays",
731 PyLong_FromSize_t(this->ndarrays
.size()));
732 PyDict_SetItemString(result
.obj(), "num_buffers",
733 PyLong_FromSize_t(this->buffers
.size()));
734 PyDict_SetItemString(result
.obj(), "data", buffers
);
739 auto PushBuffer
= [&buffers
](const std::shared_ptr
<Buffer
>& buffer
) {
740 PyObject
* wrapped_buffer
= wrap_buffer(buffer
);
742 if (PyList_Append(buffers
, wrapped_buffer
) < 0) {
743 Py_DECREF(wrapped_buffer
);
746 Py_DECREF(wrapped_buffer
);
750 constexpr int64_t kInitialCapacity
= 1024;
752 // Write the record batch describing the object structure
754 ARROW_ASSIGN_OR_RAISE(auto stream
,
755 io::BufferOutputStream::Create(kInitialCapacity
, memory_pool
));
757 ipc::WriteRecordBatchStream({this->batch
}, this->ipc_options
, stream
.get()));
758 ARROW_ASSIGN_OR_RAISE(auto buffer
, stream
->Finish());
761 RETURN_NOT_OK(PushBuffer(buffer
));
763 // For each tensor, get a metadata buffer and a buffer for the body
764 for (const auto& tensor
: this->tensors
) {
765 ARROW_ASSIGN_OR_RAISE(std::unique_ptr
<ipc::Message
> message
,
766 ipc::GetTensorMessage(*tensor
, memory_pool
));
767 RETURN_NOT_OK(PushBuffer(message
->metadata()));
768 RETURN_NOT_OK(PushBuffer(message
->body()));
771 // For each sparse tensor, get a metadata buffer and buffers containing index and data
772 for (const auto& sparse_tensor
: this->sparse_tensors
) {
773 ipc::IpcPayload payload
;
774 RETURN_NOT_OK(ipc::GetSparseTensorPayload(*sparse_tensor
, memory_pool
, &payload
));
775 RETURN_NOT_OK(PushBuffer(payload
.metadata
));
776 for (const auto& body
: payload
.body_buffers
) {
777 RETURN_NOT_OK(PushBuffer(body
));
781 // For each ndarray, get a metadata buffer and a buffer for the body
782 for (const auto& ndarray
: this->ndarrays
) {
783 ARROW_ASSIGN_OR_RAISE(std::unique_ptr
<ipc::Message
> message
,
784 ipc::GetTensorMessage(*ndarray
, memory_pool
));
785 RETURN_NOT_OK(PushBuffer(message
->metadata()));
786 RETURN_NOT_OK(PushBuffer(message
->body()));
789 for (const auto& buf
: this->buffers
) {
790 RETURN_NOT_OK(PushBuffer(buf
));
793 *out
= result
.detach();