]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/python/serialize.cc
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / arrow / cpp / src / arrow / python / serialize.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/python/serialize.h"
19 #include "arrow/python/numpy_interop.h"
20
21 #include <cstdint>
22 #include <limits>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 #include <vector>
27
28 #include <numpy/arrayobject.h>
29 #include <numpy/arrayscalars.h>
30
31 #include "arrow/array.h"
32 #include "arrow/array/builder_binary.h"
33 #include "arrow/array/builder_nested.h"
34 #include "arrow/array/builder_primitive.h"
35 #include "arrow/array/builder_union.h"
36 #include "arrow/io/interfaces.h"
37 #include "arrow/io/memory.h"
38 #include "arrow/ipc/util.h"
39 #include "arrow/ipc/writer.h"
40 #include "arrow/record_batch.h"
41 #include "arrow/result.h"
42 #include "arrow/tensor.h"
43 #include "arrow/util/logging.h"
44
45 #include "arrow/python/common.h"
46 #include "arrow/python/datetime.h"
47 #include "arrow/python/helpers.h"
48 #include "arrow/python/iterators.h"
49 #include "arrow/python/numpy_convert.h"
50 #include "arrow/python/platform.h"
51 #include "arrow/python/pyarrow.h"
52
53 constexpr int32_t kMaxRecursionDepth = 100;
54
55 namespace arrow {
56
57 using internal::checked_cast;
58
59 namespace py {
60
61 class SequenceBuilder;
62 class DictBuilder;
63
64 Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
65 int32_t recursion_depth, SerializedPyObject* blobs_out);
66
67 // A Sequence is a heterogeneous collections of elements. It can contain
68 // scalar Python types, lists, tuples, dictionaries, tensors and sparse tensors.
69 class SequenceBuilder {
70 public:
71 explicit SequenceBuilder(MemoryPool* pool = default_memory_pool())
72 : pool_(pool),
73 types_(::arrow::int8(), pool),
74 offsets_(::arrow::int32(), pool),
75 type_map_(PythonType::NUM_PYTHON_TYPES, -1) {
76 auto null_builder = std::make_shared<NullBuilder>(pool);
77 auto initial_ty = dense_union({field("0", null())});
78 builder_.reset(new DenseUnionBuilder(pool, {null_builder}, initial_ty));
79 }
80
81 // Appending a none to the sequence
82 Status AppendNone() { return builder_->AppendNull(); }
83
84 template <typename BuilderType, typename MakeBuilderFn>
85 Status CreateAndUpdate(std::shared_ptr<BuilderType>* child_builder, int8_t tag,
86 MakeBuilderFn make_builder) {
87 if (!*child_builder) {
88 child_builder->reset(make_builder());
89 std::ostringstream convert;
90 convert.imbue(std::locale::classic());
91 convert << static_cast<int>(tag);
92 type_map_[tag] = builder_->AppendChild(*child_builder, convert.str());
93 }
94 return builder_->Append(type_map_[tag]);
95 }
96
97 template <typename BuilderType, typename T>
98 Status AppendPrimitive(std::shared_ptr<BuilderType>* child_builder, const T val,
99 int8_t tag) {
100 RETURN_NOT_OK(
101 CreateAndUpdate(child_builder, tag, [this]() { return new BuilderType(pool_); }));
102 return (*child_builder)->Append(val);
103 }
104
105 // Appending a boolean to the sequence
106 Status AppendBool(const bool data) {
107 return AppendPrimitive(&bools_, data, PythonType::BOOL);
108 }
109
110 // Appending an int64_t to the sequence
111 Status AppendInt64(const int64_t data) {
112 return AppendPrimitive(&ints_, data, PythonType::INT);
113 }
114
115 // Append a list of bytes to the sequence
116 Status AppendBytes(const uint8_t* data, int32_t length) {
117 RETURN_NOT_OK(CreateAndUpdate(&bytes_, PythonType::BYTES,
118 [this]() { return new BinaryBuilder(pool_); }));
119 return bytes_->Append(data, length);
120 }
121
122 // Appending a string to the sequence
123 Status AppendString(const char* data, int32_t length) {
124 RETURN_NOT_OK(CreateAndUpdate(&strings_, PythonType::STRING,
125 [this]() { return new StringBuilder(pool_); }));
126 return strings_->Append(data, length);
127 }
128
129 // Appending a half_float to the sequence
130 Status AppendHalfFloat(const npy_half data) {
131 return AppendPrimitive(&half_floats_, data, PythonType::HALF_FLOAT);
132 }
133
134 // Appending a float to the sequence
135 Status AppendFloat(const float data) {
136 return AppendPrimitive(&floats_, data, PythonType::FLOAT);
137 }
138
139 // Appending a double to the sequence
140 Status AppendDouble(const double data) {
141 return AppendPrimitive(&doubles_, data, PythonType::DOUBLE);
142 }
143
144 // Appending a Date64 timestamp to the sequence
145 Status AppendDate64(const int64_t timestamp) {
146 return AppendPrimitive(&date64s_, timestamp, PythonType::DATE64);
147 }
148
149 // Appending a tensor to the sequence
150 //
151 // \param tensor_index Index of the tensor in the object.
152 Status AppendTensor(const int32_t tensor_index) {
153 RETURN_NOT_OK(CreateAndUpdate(&tensor_indices_, PythonType::TENSOR,
154 [this]() { return new Int32Builder(pool_); }));
155 return tensor_indices_->Append(tensor_index);
156 }
157
158 // Appending a sparse coo tensor to the sequence
159 //
160 // \param sparse_coo_tensor_index Index of the sparse coo tensor in the object.
161 Status AppendSparseCOOTensor(const int32_t sparse_coo_tensor_index) {
162 RETURN_NOT_OK(CreateAndUpdate(&sparse_coo_tensor_indices_,
163 PythonType::SPARSECOOTENSOR,
164 [this]() { return new Int32Builder(pool_); }));
165 return sparse_coo_tensor_indices_->Append(sparse_coo_tensor_index);
166 }
167
168 // Appending a sparse csr matrix to the sequence
169 //
170 // \param sparse_csr_matrix_index Index of the sparse csr matrix in the object.
171 Status AppendSparseCSRMatrix(const int32_t sparse_csr_matrix_index) {
172 RETURN_NOT_OK(CreateAndUpdate(&sparse_csr_matrix_indices_,
173 PythonType::SPARSECSRMATRIX,
174 [this]() { return new Int32Builder(pool_); }));
175 return sparse_csr_matrix_indices_->Append(sparse_csr_matrix_index);
176 }
177
178 // Appending a sparse csc matrix to the sequence
179 //
180 // \param sparse_csc_matrix_index Index of the sparse csc matrix in the object.
181 Status AppendSparseCSCMatrix(const int32_t sparse_csc_matrix_index) {
182 RETURN_NOT_OK(CreateAndUpdate(&sparse_csc_matrix_indices_,
183 PythonType::SPARSECSCMATRIX,
184 [this]() { return new Int32Builder(pool_); }));
185 return sparse_csc_matrix_indices_->Append(sparse_csc_matrix_index);
186 }
187
188 // Appending a sparse csf tensor to the sequence
189 //
190 // \param sparse_csf_tensor_index Index of the sparse csf tensor in the object.
191 Status AppendSparseCSFTensor(const int32_t sparse_csf_tensor_index) {
192 RETURN_NOT_OK(CreateAndUpdate(&sparse_csf_tensor_indices_,
193 PythonType::SPARSECSFTENSOR,
194 [this]() { return new Int32Builder(pool_); }));
195 return sparse_csf_tensor_indices_->Append(sparse_csf_tensor_index);
196 }
197
198 // Appending a numpy ndarray to the sequence
199 //
200 // \param tensor_index Index of the tensor in the object.
201 Status AppendNdarray(const int32_t ndarray_index) {
202 RETURN_NOT_OK(CreateAndUpdate(&ndarray_indices_, PythonType::NDARRAY,
203 [this]() { return new Int32Builder(pool_); }));
204 return ndarray_indices_->Append(ndarray_index);
205 }
206
207 // Appending a buffer to the sequence
208 //
209 // \param buffer_index Index of the buffer in the object.
210 Status AppendBuffer(const int32_t buffer_index) {
211 RETURN_NOT_OK(CreateAndUpdate(&buffer_indices_, PythonType::BUFFER,
212 [this]() { return new Int32Builder(pool_); }));
213 return buffer_indices_->Append(buffer_index);
214 }
215
216 Status AppendSequence(PyObject* context, PyObject* sequence, int8_t tag,
217 std::shared_ptr<ListBuilder>& target_sequence,
218 std::unique_ptr<SequenceBuilder>& values, int32_t recursion_depth,
219 SerializedPyObject* blobs_out) {
220 if (recursion_depth >= kMaxRecursionDepth) {
221 return Status::NotImplemented(
222 "This object exceeds the maximum recursion depth. It may contain itself "
223 "recursively.");
224 }
225 RETURN_NOT_OK(CreateAndUpdate(&target_sequence, tag, [this, &values]() {
226 values.reset(new SequenceBuilder(pool_));
227 return new ListBuilder(pool_, values->builder());
228 }));
229 RETURN_NOT_OK(target_sequence->Append());
230 return internal::VisitIterable(
231 sequence, [&](PyObject* obj, bool* keep_going /* unused */) {
232 return Append(context, obj, values.get(), recursion_depth, blobs_out);
233 });
234 }
235
236 Status AppendList(PyObject* context, PyObject* list, int32_t recursion_depth,
237 SerializedPyObject* blobs_out) {
238 return AppendSequence(context, list, PythonType::LIST, lists_, list_values_,
239 recursion_depth + 1, blobs_out);
240 }
241
242 Status AppendTuple(PyObject* context, PyObject* tuple, int32_t recursion_depth,
243 SerializedPyObject* blobs_out) {
244 return AppendSequence(context, tuple, PythonType::TUPLE, tuples_, tuple_values_,
245 recursion_depth + 1, blobs_out);
246 }
247
248 Status AppendSet(PyObject* context, PyObject* set, int32_t recursion_depth,
249 SerializedPyObject* blobs_out) {
250 return AppendSequence(context, set, PythonType::SET, sets_, set_values_,
251 recursion_depth + 1, blobs_out);
252 }
253
254 Status AppendDict(PyObject* context, PyObject* dict, int32_t recursion_depth,
255 SerializedPyObject* blobs_out);
256
257 // Finish building the sequence and return the result.
258 // Input arrays may be nullptr
259 Status Finish(std::shared_ptr<Array>* out) { return builder_->Finish(out); }
260
261 std::shared_ptr<DenseUnionBuilder> builder() { return builder_; }
262
263 private:
264 MemoryPool* pool_;
265
266 Int8Builder types_;
267 Int32Builder offsets_;
268
269 /// Mapping from PythonType to child index
270 std::vector<int8_t> type_map_;
271
272 std::shared_ptr<BooleanBuilder> bools_;
273 std::shared_ptr<Int64Builder> ints_;
274 std::shared_ptr<BinaryBuilder> bytes_;
275 std::shared_ptr<StringBuilder> strings_;
276 std::shared_ptr<HalfFloatBuilder> half_floats_;
277 std::shared_ptr<FloatBuilder> floats_;
278 std::shared_ptr<DoubleBuilder> doubles_;
279 std::shared_ptr<Date64Builder> date64s_;
280
281 std::unique_ptr<SequenceBuilder> list_values_;
282 std::shared_ptr<ListBuilder> lists_;
283 std::unique_ptr<DictBuilder> dict_values_;
284 std::shared_ptr<ListBuilder> dicts_;
285 std::unique_ptr<SequenceBuilder> tuple_values_;
286 std::shared_ptr<ListBuilder> tuples_;
287 std::unique_ptr<SequenceBuilder> set_values_;
288 std::shared_ptr<ListBuilder> sets_;
289
290 std::shared_ptr<Int32Builder> tensor_indices_;
291 std::shared_ptr<Int32Builder> sparse_coo_tensor_indices_;
292 std::shared_ptr<Int32Builder> sparse_csr_matrix_indices_;
293 std::shared_ptr<Int32Builder> sparse_csc_matrix_indices_;
294 std::shared_ptr<Int32Builder> sparse_csf_tensor_indices_;
295 std::shared_ptr<Int32Builder> ndarray_indices_;
296 std::shared_ptr<Int32Builder> buffer_indices_;
297
298 std::shared_ptr<DenseUnionBuilder> builder_;
299 };
300
301 // Constructing dictionaries of key/value pairs. Sequences of
302 // keys and values are built separately using a pair of
303 // SequenceBuilders. The resulting Arrow representation
304 // can be obtained via the Finish method.
305 class DictBuilder {
306 public:
307 explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {
308 builder_.reset(new StructBuilder(struct_({field("keys", dense_union(FieldVector{})),
309 field("vals", dense_union(FieldVector{}))}),
310 pool, {keys_.builder(), vals_.builder()}));
311 }
312
313 // Builder for the keys of the dictionary
314 SequenceBuilder& keys() { return keys_; }
315 // Builder for the values of the dictionary
316 SequenceBuilder& vals() { return vals_; }
317
318 // Construct an Arrow StructArray representing the dictionary.
319 // Contains a field "keys" for the keys and "vals" for the values.
320 Status Finish(std::shared_ptr<Array>* out) { return builder_->Finish(out); }
321
322 std::shared_ptr<StructBuilder> builder() { return builder_; }
323
324 private:
325 SequenceBuilder keys_;
326 SequenceBuilder vals_;
327 std::shared_ptr<StructBuilder> builder_;
328 };
329
330 Status SequenceBuilder::AppendDict(PyObject* context, PyObject* dict,
331 int32_t recursion_depth,
332 SerializedPyObject* blobs_out) {
333 if (recursion_depth >= kMaxRecursionDepth) {
334 return Status::NotImplemented(
335 "This object exceeds the maximum recursion depth. It may contain itself "
336 "recursively.");
337 }
338 RETURN_NOT_OK(CreateAndUpdate(&dicts_, PythonType::DICT, [this]() {
339 dict_values_.reset(new DictBuilder(pool_));
340 return new ListBuilder(pool_, dict_values_->builder());
341 }));
342 RETURN_NOT_OK(dicts_->Append());
343 PyObject* key;
344 PyObject* value;
345 Py_ssize_t pos = 0;
346 while (PyDict_Next(dict, &pos, &key, &value)) {
347 RETURN_NOT_OK(dict_values_->builder()->Append());
348 RETURN_NOT_OK(
349 Append(context, key, &dict_values_->keys(), recursion_depth + 1, blobs_out));
350 RETURN_NOT_OK(
351 Append(context, value, &dict_values_->vals(), recursion_depth + 1, blobs_out));
352 }
353
354 // This block is used to decrement the reference counts of the results
355 // returned by the serialization callback, which is called in AppendArray,
356 // in DeserializeDict and in Append
357 static PyObject* py_type = PyUnicode_FromString("_pytype_");
358 if (PyDict_Contains(dict, py_type)) {
359 // If the dictionary contains the key "_pytype_", then the user has to
360 // have registered a callback.
361 if (context == Py_None) {
362 return Status::Invalid("No serialization callback set");
363 }
364 Py_XDECREF(dict);
365 }
366 return Status::OK();
367 }
368
369 Status CallCustomCallback(PyObject* context, PyObject* method_name, PyObject* elem,
370 PyObject** result) {
371 if (context == Py_None) {
372 *result = NULL;
373 return Status::SerializationError("error while calling callback on ",
374 internal::PyObject_StdStringRepr(elem),
375 ": handler not registered");
376 } else {
377 *result = PyObject_CallMethodObjArgs(context, method_name, elem, NULL);
378 return CheckPyError();
379 }
380 }
381
382 Status CallSerializeCallback(PyObject* context, PyObject* value,
383 PyObject** serialized_object) {
384 OwnedRef method_name(PyUnicode_FromString("_serialize_callback"));
385 RETURN_NOT_OK(CallCustomCallback(context, method_name.obj(), value, serialized_object));
386 if (!PyDict_Check(*serialized_object)) {
387 return Status::TypeError("serialization callback must return a valid dictionary");
388 }
389 return Status::OK();
390 }
391
392 Status CallDeserializeCallback(PyObject* context, PyObject* value,
393 PyObject** deserialized_object) {
394 OwnedRef method_name(PyUnicode_FromString("_deserialize_callback"));
395 return CallCustomCallback(context, method_name.obj(), value, deserialized_object);
396 }
397
398 Status AppendArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
399 int32_t recursion_depth, SerializedPyObject* blobs_out);
400
401 template <typename NumpyScalarObject>
402 Status AppendIntegerScalar(PyObject* obj, SequenceBuilder* builder) {
403 int64_t value = reinterpret_cast<NumpyScalarObject*>(obj)->obval;
404 return builder->AppendInt64(value);
405 }
406
407 // Append a potentially 64-bit wide unsigned Numpy scalar.
408 // Must check for overflow as we reinterpret it as signed int64.
409 template <typename NumpyScalarObject>
410 Status AppendLargeUnsignedScalar(PyObject* obj, SequenceBuilder* builder) {
411 constexpr uint64_t max_value = std::numeric_limits<int64_t>::max();
412
413 uint64_t value = reinterpret_cast<NumpyScalarObject*>(obj)->obval;
414 if (value > max_value) {
415 return Status::Invalid("cannot serialize Numpy uint64 scalar >= 2**63");
416 }
417 return builder->AppendInt64(static_cast<int64_t>(value));
418 }
419
420 Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
421 if (PyArray_IsScalar(obj, Bool)) {
422 return builder->AppendBool(reinterpret_cast<PyBoolScalarObject*>(obj)->obval != 0);
423 } else if (PyArray_IsScalar(obj, Half)) {
424 return builder->AppendHalfFloat(reinterpret_cast<PyHalfScalarObject*>(obj)->obval);
425 } else if (PyArray_IsScalar(obj, Float)) {
426 return builder->AppendFloat(reinterpret_cast<PyFloatScalarObject*>(obj)->obval);
427 } else if (PyArray_IsScalar(obj, Double)) {
428 return builder->AppendDouble(reinterpret_cast<PyDoubleScalarObject*>(obj)->obval);
429 }
430 if (PyArray_IsScalar(obj, Byte)) {
431 return AppendIntegerScalar<PyByteScalarObject>(obj, builder);
432 } else if (PyArray_IsScalar(obj, Short)) {
433 return AppendIntegerScalar<PyShortScalarObject>(obj, builder);
434 } else if (PyArray_IsScalar(obj, Int)) {
435 return AppendIntegerScalar<PyIntScalarObject>(obj, builder);
436 } else if (PyArray_IsScalar(obj, Long)) {
437 return AppendIntegerScalar<PyLongScalarObject>(obj, builder);
438 } else if (PyArray_IsScalar(obj, LongLong)) {
439 return AppendIntegerScalar<PyLongLongScalarObject>(obj, builder);
440 } else if (PyArray_IsScalar(obj, Int64)) {
441 return AppendIntegerScalar<PyInt64ScalarObject>(obj, builder);
442 } else if (PyArray_IsScalar(obj, UByte)) {
443 return AppendIntegerScalar<PyUByteScalarObject>(obj, builder);
444 } else if (PyArray_IsScalar(obj, UShort)) {
445 return AppendIntegerScalar<PyUShortScalarObject>(obj, builder);
446 } else if (PyArray_IsScalar(obj, UInt)) {
447 return AppendIntegerScalar<PyUIntScalarObject>(obj, builder);
448 } else if (PyArray_IsScalar(obj, ULong)) {
449 return AppendLargeUnsignedScalar<PyULongScalarObject>(obj, builder);
450 } else if (PyArray_IsScalar(obj, ULongLong)) {
451 return AppendLargeUnsignedScalar<PyULongLongScalarObject>(obj, builder);
452 } else if (PyArray_IsScalar(obj, UInt64)) {
453 return AppendLargeUnsignedScalar<PyUInt64ScalarObject>(obj, builder);
454 }
455 return Status::NotImplemented("Numpy scalar type not recognized");
456 }
457
458 Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
459 int32_t recursion_depth, SerializedPyObject* blobs_out) {
460 // The bool case must precede the int case (PyInt_Check passes for bools)
461 if (PyBool_Check(elem)) {
462 RETURN_NOT_OK(builder->AppendBool(elem == Py_True));
463 } else if (PyArray_DescrFromScalar(elem)->type_num == NPY_HALF) {
464 npy_half halffloat = reinterpret_cast<PyHalfScalarObject*>(elem)->obval;
465 RETURN_NOT_OK(builder->AppendHalfFloat(halffloat));
466 } else if (PyFloat_Check(elem)) {
467 RETURN_NOT_OK(builder->AppendDouble(PyFloat_AS_DOUBLE(elem)));
468 } else if (PyLong_Check(elem)) {
469 int overflow = 0;
470 int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow);
471 if (!overflow) {
472 RETURN_NOT_OK(builder->AppendInt64(data));
473 } else {
474 // Attempt to serialize the object using the custom callback.
475 PyObject* serialized_object;
476 // The reference count of serialized_object will be decremented in SerializeDict
477 RETURN_NOT_OK(CallSerializeCallback(context, elem, &serialized_object));
478 RETURN_NOT_OK(
479 builder->AppendDict(context, serialized_object, recursion_depth, blobs_out));
480 }
481 } else if (PyBytes_Check(elem)) {
482 auto data = reinterpret_cast<uint8_t*>(PyBytes_AS_STRING(elem));
483 int32_t size = -1;
484 RETURN_NOT_OK(internal::CastSize(PyBytes_GET_SIZE(elem), &size));
485 RETURN_NOT_OK(builder->AppendBytes(data, size));
486 } else if (PyUnicode_Check(elem)) {
487 ARROW_ASSIGN_OR_RAISE(auto view, PyBytesView::FromUnicode(elem));
488 int32_t size = -1;
489 RETURN_NOT_OK(internal::CastSize(view.size, &size));
490 RETURN_NOT_OK(builder->AppendString(view.bytes, size));
491 } else if (PyList_CheckExact(elem)) {
492 RETURN_NOT_OK(builder->AppendList(context, elem, recursion_depth, blobs_out));
493 } else if (PyDict_CheckExact(elem)) {
494 RETURN_NOT_OK(builder->AppendDict(context, elem, recursion_depth, blobs_out));
495 } else if (PyTuple_CheckExact(elem)) {
496 RETURN_NOT_OK(builder->AppendTuple(context, elem, recursion_depth, blobs_out));
497 } else if (PySet_Check(elem)) {
498 RETURN_NOT_OK(builder->AppendSet(context, elem, recursion_depth, blobs_out));
499 } else if (PyArray_IsScalar(elem, Generic)) {
500 RETURN_NOT_OK(AppendScalar(elem, builder));
501 } else if (PyArray_CheckExact(elem)) {
502 RETURN_NOT_OK(AppendArray(context, reinterpret_cast<PyArrayObject*>(elem), builder,
503 recursion_depth, blobs_out));
504 } else if (elem == Py_None) {
505 RETURN_NOT_OK(builder->AppendNone());
506 } else if (PyDateTime_Check(elem)) {
507 PyDateTime_DateTime* datetime = reinterpret_cast<PyDateTime_DateTime*>(elem);
508 RETURN_NOT_OK(builder->AppendDate64(internal::PyDateTime_to_us(datetime)));
509 } else if (is_buffer(elem)) {
510 RETURN_NOT_OK(builder->AppendBuffer(static_cast<int32_t>(blobs_out->buffers.size())));
511 ARROW_ASSIGN_OR_RAISE(auto buffer, unwrap_buffer(elem));
512 blobs_out->buffers.push_back(buffer);
513 } else if (is_tensor(elem)) {
514 RETURN_NOT_OK(builder->AppendTensor(static_cast<int32_t>(blobs_out->tensors.size())));
515 ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_tensor(elem));
516 blobs_out->tensors.push_back(tensor);
517 } else if (is_sparse_coo_tensor(elem)) {
518 RETURN_NOT_OK(builder->AppendSparseCOOTensor(
519 static_cast<int32_t>(blobs_out->sparse_tensors.size())));
520 ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_sparse_coo_tensor(elem));
521 blobs_out->sparse_tensors.push_back(tensor);
522 } else if (is_sparse_csr_matrix(elem)) {
523 RETURN_NOT_OK(builder->AppendSparseCSRMatrix(
524 static_cast<int32_t>(blobs_out->sparse_tensors.size())));
525 ARROW_ASSIGN_OR_RAISE(auto matrix, unwrap_sparse_csr_matrix(elem));
526 blobs_out->sparse_tensors.push_back(matrix);
527 } else if (is_sparse_csc_matrix(elem)) {
528 RETURN_NOT_OK(builder->AppendSparseCSCMatrix(
529 static_cast<int32_t>(blobs_out->sparse_tensors.size())));
530 ARROW_ASSIGN_OR_RAISE(auto matrix, unwrap_sparse_csc_matrix(elem));
531 blobs_out->sparse_tensors.push_back(matrix);
532 } else if (is_sparse_csf_tensor(elem)) {
533 RETURN_NOT_OK(builder->AppendSparseCSFTensor(
534 static_cast<int32_t>(blobs_out->sparse_tensors.size())));
535 ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_sparse_csf_tensor(elem));
536 blobs_out->sparse_tensors.push_back(tensor);
537 } else {
538 // Attempt to serialize the object using the custom callback.
539 PyObject* serialized_object;
540 // The reference count of serialized_object will be decremented in SerializeDict
541 RETURN_NOT_OK(CallSerializeCallback(context, elem, &serialized_object));
542 RETURN_NOT_OK(
543 builder->AppendDict(context, serialized_object, recursion_depth, blobs_out));
544 }
545 return Status::OK();
546 }
547
548 Status AppendArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
549 int32_t recursion_depth, SerializedPyObject* blobs_out) {
550 int dtype = PyArray_TYPE(array);
551 switch (dtype) {
552 case NPY_UINT8:
553 case NPY_INT8:
554 case NPY_UINT16:
555 case NPY_INT16:
556 case NPY_UINT32:
557 case NPY_INT32:
558 case NPY_UINT64:
559 case NPY_INT64:
560 case NPY_HALF:
561 case NPY_FLOAT:
562 case NPY_DOUBLE: {
563 RETURN_NOT_OK(
564 builder->AppendNdarray(static_cast<int32_t>(blobs_out->ndarrays.size())));
565 std::shared_ptr<Tensor> tensor;
566 RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(),
567 reinterpret_cast<PyObject*>(array), {}, &tensor));
568 blobs_out->ndarrays.push_back(tensor);
569 } break;
570 default: {
571 PyObject* serialized_object;
572 // The reference count of serialized_object will be decremented in SerializeDict
573 RETURN_NOT_OK(CallSerializeCallback(context, reinterpret_cast<PyObject*>(array),
574 &serialized_object));
575 RETURN_NOT_OK(builder->AppendDict(context, serialized_object, recursion_depth + 1,
576 blobs_out));
577 }
578 }
579 return Status::OK();
580 }
581
582 std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data) {
583 auto field = std::make_shared<Field>("list", data->type());
584 auto schema = ::arrow::schema({field});
585 return RecordBatch::Make(schema, data->length(), {data});
586 }
587
588 Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject* out) {
589 PyAcquireGIL lock;
590 SequenceBuilder builder;
591 RETURN_NOT_OK(internal::VisitIterable(
592 sequence, [&](PyObject* obj, bool* keep_going /* unused */) {
593 return Append(context, obj, &builder, 0, out);
594 }));
595 std::shared_ptr<Array> array;
596 RETURN_NOT_OK(builder.Finish(&array));
597 out->batch = MakeBatch(array);
598 return Status::OK();
599 }
600
601 Status SerializeNdarray(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
602 std::shared_ptr<Array> array;
603 SequenceBuilder builder;
604 RETURN_NOT_OK(builder.AppendNdarray(static_cast<int32_t>(out->ndarrays.size())));
605 out->ndarrays.push_back(tensor);
606 RETURN_NOT_OK(builder.Finish(&array));
607 out->batch = MakeBatch(array);
608 return Status::OK();
609 }
610
611 Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
612 const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
613 io::OutputStream* dst) {
614 auto empty_tensor = std::make_shared<Tensor>(
615 dtype, std::make_shared<Buffer>(nullptr, tensor_num_bytes), shape);
616 SerializedPyObject serialized_tensor;
617 RETURN_NOT_OK(SerializeNdarray(empty_tensor, &serialized_tensor));
618 return serialized_tensor.WriteTo(dst);
619 }
620
621 SerializedPyObject::SerializedPyObject()
622 : ipc_options(ipc::IpcWriteOptions::Defaults()) {}
623
624 Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
625 int32_t num_tensors = static_cast<int32_t>(this->tensors.size());
626 int32_t num_sparse_tensors = static_cast<int32_t>(this->sparse_tensors.size());
627 int32_t num_ndarrays = static_cast<int32_t>(this->ndarrays.size());
628 int32_t num_buffers = static_cast<int32_t>(this->buffers.size());
629 RETURN_NOT_OK(
630 dst->Write(reinterpret_cast<const uint8_t*>(&num_tensors), sizeof(int32_t)));
631 RETURN_NOT_OK(
632 dst->Write(reinterpret_cast<const uint8_t*>(&num_sparse_tensors), sizeof(int32_t)));
633 RETURN_NOT_OK(
634 dst->Write(reinterpret_cast<const uint8_t*>(&num_ndarrays), sizeof(int32_t)));
635 RETURN_NOT_OK(
636 dst->Write(reinterpret_cast<const uint8_t*>(&num_buffers), sizeof(int32_t)));
637
638 // Align stream to 8-byte offset
639 RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kArrowIpcAlignment));
640 RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, dst));
641
642 // Align stream to 64-byte offset so tensor bodies are 64-byte aligned
643 RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
644
645 int32_t metadata_length;
646 int64_t body_length;
647 for (const auto& tensor : this->tensors) {
648 RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
649 RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
650 }
651
652 for (const auto& sparse_tensor : this->sparse_tensors) {
653 RETURN_NOT_OK(
654 ipc::WriteSparseTensor(*sparse_tensor, dst, &metadata_length, &body_length));
655 RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
656 }
657
658 for (const auto& tensor : this->ndarrays) {
659 RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
660 RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
661 }
662
663 for (const auto& buffer : this->buffers) {
664 int64_t size = buffer->size();
665 RETURN_NOT_OK(dst->Write(reinterpret_cast<const uint8_t*>(&size), sizeof(int64_t)));
666 RETURN_NOT_OK(dst->Write(buffer->data(), size));
667 }
668
669 return Status::OK();
670 }
671
672 namespace {
673
674 Status CountSparseTensors(
675 const std::vector<std::shared_ptr<SparseTensor>>& sparse_tensors, PyObject** out) {
676 OwnedRef num_sparse_tensors(PyDict_New());
677 size_t num_coo = 0;
678 size_t num_csr = 0;
679 size_t num_csc = 0;
680 size_t num_csf = 0;
681 size_t ndim_csf = 0;
682
683 for (const auto& sparse_tensor : sparse_tensors) {
684 switch (sparse_tensor->format_id()) {
685 case SparseTensorFormat::COO:
686 ++num_coo;
687 break;
688 case SparseTensorFormat::CSR:
689 ++num_csr;
690 break;
691 case SparseTensorFormat::CSC:
692 ++num_csc;
693 break;
694 case SparseTensorFormat::CSF:
695 ++num_csf;
696 ndim_csf += sparse_tensor->ndim();
697 break;
698 }
699 }
700
701 PyDict_SetItemString(num_sparse_tensors.obj(), "coo", PyLong_FromSize_t(num_coo));
702 PyDict_SetItemString(num_sparse_tensors.obj(), "csr", PyLong_FromSize_t(num_csr));
703 PyDict_SetItemString(num_sparse_tensors.obj(), "csc", PyLong_FromSize_t(num_csc));
704 PyDict_SetItemString(num_sparse_tensors.obj(), "csf", PyLong_FromSize_t(num_csf));
705 PyDict_SetItemString(num_sparse_tensors.obj(), "ndim_csf", PyLong_FromSize_t(ndim_csf));
706 RETURN_IF_PYERROR();
707
708 *out = num_sparse_tensors.detach();
709 return Status::OK();
710 }
711
712 } // namespace
713
714 Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out) {
715 PyAcquireGIL py_gil;
716
717 OwnedRef result(PyDict_New());
718 PyObject* buffers = PyList_New(0);
719 PyObject* num_sparse_tensors = nullptr;
720
721 // TODO(wesm): Not sure how pedantic we need to be about checking the return
722 // values of these functions. There are other places where we do not check
723 // PyDict_SetItem/SetItemString return value, but these failures would be
724 // quite esoteric
725 PyDict_SetItemString(result.obj(), "num_tensors",
726 PyLong_FromSize_t(this->tensors.size()));
727 RETURN_NOT_OK(CountSparseTensors(this->sparse_tensors, &num_sparse_tensors));
728 PyDict_SetItemString(result.obj(), "num_sparse_tensors", num_sparse_tensors);
729 PyDict_SetItemString(result.obj(), "ndim_csf", num_sparse_tensors);
730 PyDict_SetItemString(result.obj(), "num_ndarrays",
731 PyLong_FromSize_t(this->ndarrays.size()));
732 PyDict_SetItemString(result.obj(), "num_buffers",
733 PyLong_FromSize_t(this->buffers.size()));
734 PyDict_SetItemString(result.obj(), "data", buffers);
735 RETURN_IF_PYERROR();
736
737 Py_DECREF(buffers);
738
739 auto PushBuffer = [&buffers](const std::shared_ptr<Buffer>& buffer) {
740 PyObject* wrapped_buffer = wrap_buffer(buffer);
741 RETURN_IF_PYERROR();
742 if (PyList_Append(buffers, wrapped_buffer) < 0) {
743 Py_DECREF(wrapped_buffer);
744 RETURN_IF_PYERROR();
745 }
746 Py_DECREF(wrapped_buffer);
747 return Status::OK();
748 };
749
750 constexpr int64_t kInitialCapacity = 1024;
751
752 // Write the record batch describing the object structure
753 py_gil.release();
754 ARROW_ASSIGN_OR_RAISE(auto stream,
755 io::BufferOutputStream::Create(kInitialCapacity, memory_pool));
756 RETURN_NOT_OK(
757 ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, stream.get()));
758 ARROW_ASSIGN_OR_RAISE(auto buffer, stream->Finish());
759 py_gil.acquire();
760
761 RETURN_NOT_OK(PushBuffer(buffer));
762
763 // For each tensor, get a metadata buffer and a buffer for the body
764 for (const auto& tensor : this->tensors) {
765 ARROW_ASSIGN_OR_RAISE(std::unique_ptr<ipc::Message> message,
766 ipc::GetTensorMessage(*tensor, memory_pool));
767 RETURN_NOT_OK(PushBuffer(message->metadata()));
768 RETURN_NOT_OK(PushBuffer(message->body()));
769 }
770
771 // For each sparse tensor, get a metadata buffer and buffers containing index and data
772 for (const auto& sparse_tensor : this->sparse_tensors) {
773 ipc::IpcPayload payload;
774 RETURN_NOT_OK(ipc::GetSparseTensorPayload(*sparse_tensor, memory_pool, &payload));
775 RETURN_NOT_OK(PushBuffer(payload.metadata));
776 for (const auto& body : payload.body_buffers) {
777 RETURN_NOT_OK(PushBuffer(body));
778 }
779 }
780
781 // For each ndarray, get a metadata buffer and a buffer for the body
782 for (const auto& ndarray : this->ndarrays) {
783 ARROW_ASSIGN_OR_RAISE(std::unique_ptr<ipc::Message> message,
784 ipc::GetTensorMessage(*ndarray, memory_pool));
785 RETURN_NOT_OK(PushBuffer(message->metadata()));
786 RETURN_NOT_OK(PushBuffer(message->body()));
787 }
788
789 for (const auto& buf : this->buffers) {
790 RETURN_NOT_OK(PushBuffer(buf));
791 }
792
793 *out = result.detach();
794 return Status::OK();
795 }
796
797 } // namespace py
798 } // namespace arrow