]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/pyarrow/serialization.pxi
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / serialization.pxi
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 from cpython.ref cimport PyObject
19
20 import warnings
21
22
23 def _deprecate_serialization(name):
24 msg = (
25 "'pyarrow.{}' is deprecated as of 2.0.0 and will be removed in a "
26 "future version. Use pickle or the pyarrow IPC functionality instead."
27 ).format(name)
28 warnings.warn(msg, FutureWarning, stacklevel=3)
29
30
31 def is_named_tuple(cls):
32 """
33 Return True if cls is a namedtuple and False otherwise.
34 """
35 b = cls.__bases__
36 if len(b) != 1 or b[0] != tuple:
37 return False
38 f = getattr(cls, "_fields", None)
39 if not isinstance(f, tuple):
40 return False
41 return all(isinstance(n, str) for n in f)
42
43
44 class SerializationCallbackError(ArrowSerializationError):
45 def __init__(self, message, example_object):
46 ArrowSerializationError.__init__(self, message)
47 self.example_object = example_object
48
49
50 class DeserializationCallbackError(ArrowSerializationError):
51 def __init__(self, message, type_id):
52 ArrowSerializationError.__init__(self, message)
53 self.type_id = type_id
54
55
56 cdef class SerializationContext(_Weakrefable):
57 cdef:
58 object type_to_type_id
59 object whitelisted_types
60 object types_to_pickle
61 object custom_serializers
62 object custom_deserializers
63 object pickle_serializer
64 object pickle_deserializer
65
66 def __init__(self):
67 # Types with special serialization handlers
68 self.type_to_type_id = dict()
69 self.whitelisted_types = dict()
70 self.types_to_pickle = set()
71 self.custom_serializers = dict()
72 self.custom_deserializers = dict()
73 self.pickle_serializer = pickle.dumps
74 self.pickle_deserializer = pickle.loads
75
76 def set_pickle(self, serializer, deserializer):
77 """
78 Set the serializer and deserializer to use for objects that are to be
79 pickled.
80
81 Parameters
82 ----------
83 serializer : callable
84 The serializer to use (e.g., pickle.dumps or cloudpickle.dumps).
85 deserializer : callable
86 The deserializer to use (e.g., pickle.dumps or cloudpickle.dumps).
87 """
88 self.pickle_serializer = serializer
89 self.pickle_deserializer = deserializer
90
91 def clone(self):
92 """
93 Return copy of this SerializationContext.
94
95 Returns
96 -------
97 clone : SerializationContext
98 """
99 result = SerializationContext()
100 result.type_to_type_id = self.type_to_type_id.copy()
101 result.whitelisted_types = self.whitelisted_types.copy()
102 result.types_to_pickle = self.types_to_pickle.copy()
103 result.custom_serializers = self.custom_serializers.copy()
104 result.custom_deserializers = self.custom_deserializers.copy()
105 result.pickle_serializer = self.pickle_serializer
106 result.pickle_deserializer = self.pickle_deserializer
107
108 return result
109
110 def register_type(self, type_, type_id, pickle=False,
111 custom_serializer=None, custom_deserializer=None):
112 r"""
113 EXPERIMENTAL: Add type to the list of types we can serialize.
114
115 Parameters
116 ----------
117 type\_ : type
118 The type that we can serialize.
119 type_id : string
120 A string used to identify the type.
121 pickle : bool
122 True if the serialization should be done with pickle.
123 False if it should be done efficiently with Arrow.
124 custom_serializer : callable
125 This argument is optional, but can be provided to
126 serialize objects of the class in a particular way.
127 custom_deserializer : callable
128 This argument is optional, but can be provided to
129 deserialize objects of the class in a particular way.
130 """
131 if not isinstance(type_id, str):
132 raise TypeError("The type_id argument must be a string. The value "
133 "passed in has type {}.".format(type(type_id)))
134
135 self.type_to_type_id[type_] = type_id
136 self.whitelisted_types[type_id] = type_
137 if pickle:
138 self.types_to_pickle.add(type_id)
139 if custom_serializer is not None:
140 self.custom_serializers[type_id] = custom_serializer
141 self.custom_deserializers[type_id] = custom_deserializer
142
143 def _serialize_callback(self, obj):
144 found = False
145 for type_ in type(obj).__mro__:
146 if type_ in self.type_to_type_id:
147 found = True
148 break
149
150 if not found:
151 raise SerializationCallbackError(
152 "pyarrow does not know how to "
153 "serialize objects of type {}.".format(type(obj)), obj
154 )
155
156 # use the closest match to type(obj)
157 type_id = self.type_to_type_id[type_]
158 if type_id in self.types_to_pickle:
159 serialized_obj = {"data": self.pickle_serializer(obj),
160 "pickle": True}
161 elif type_id in self.custom_serializers:
162 serialized_obj = {"data": self.custom_serializers[type_id](obj)}
163 else:
164 if is_named_tuple(type_):
165 serialized_obj = {}
166 serialized_obj["_pa_getnewargs_"] = obj.__getnewargs__()
167 elif hasattr(obj, "__dict__"):
168 serialized_obj = obj.__dict__
169 else:
170 msg = "We do not know how to serialize " \
171 "the object '{}'".format(obj)
172 raise SerializationCallbackError(msg, obj)
173 return dict(serialized_obj, **{"_pytype_": type_id})
174
175 def _deserialize_callback(self, serialized_obj):
176 type_id = serialized_obj["_pytype_"]
177 if isinstance(type_id, bytes):
178 # ARROW-4675: Python 2 serialized, read in Python 3
179 type_id = frombytes(type_id)
180
181 if "pickle" in serialized_obj:
182 # The object was pickled, so unpickle it.
183 obj = self.pickle_deserializer(serialized_obj["data"])
184 else:
185 assert type_id not in self.types_to_pickle
186 if type_id not in self.whitelisted_types:
187 msg = "Type ID " + type_id + " not registered in " \
188 "deserialization callback"
189 raise DeserializationCallbackError(msg, type_id)
190 type_ = self.whitelisted_types[type_id]
191 if type_id in self.custom_deserializers:
192 obj = self.custom_deserializers[type_id](
193 serialized_obj["data"])
194 else:
195 # In this case, serialized_obj should just be
196 # the __dict__ field.
197 if "_pa_getnewargs_" in serialized_obj:
198 obj = type_.__new__(
199 type_, *serialized_obj["_pa_getnewargs_"])
200 else:
201 obj = type_.__new__(type_)
202 serialized_obj.pop("_pytype_")
203 obj.__dict__.update(serialized_obj)
204 return obj
205
206 def serialize(self, obj):
207 """
208 Call pyarrow.serialize and pass this SerializationContext.
209 """
210 return serialize(obj, context=self)
211
212 def serialize_to(self, object value, sink):
213 """
214 Call pyarrow.serialize_to and pass this SerializationContext.
215 """
216 return serialize_to(value, sink, context=self)
217
218 def deserialize(self, what):
219 """
220 Call pyarrow.deserialize and pass this SerializationContext.
221 """
222 return deserialize(what, context=self)
223
224 def deserialize_components(self, what):
225 """
226 Call pyarrow.deserialize_components and pass this SerializationContext.
227 """
228 return deserialize_components(what, context=self)
229
230
231 _default_serialization_context = SerializationContext()
232 _default_context_initialized = False
233
234
235 def _get_default_context():
236 global _default_context_initialized
237 from pyarrow.serialization import _register_default_serialization_handlers
238 if not _default_context_initialized:
239 _register_default_serialization_handlers(
240 _default_serialization_context)
241 _default_context_initialized = True
242 return _default_serialization_context
243
244
245 cdef class SerializedPyObject(_Weakrefable):
246 """
247 Arrow-serialized representation of Python object.
248 """
249 cdef:
250 CSerializedPyObject data
251
252 cdef readonly:
253 object base
254
255 @property
256 def total_bytes(self):
257 cdef CMockOutputStream mock_stream
258 with nogil:
259 check_status(self.data.WriteTo(&mock_stream))
260
261 return mock_stream.GetExtentBytesWritten()
262
263 def write_to(self, sink):
264 """
265 Write serialized object to a sink.
266 """
267 cdef shared_ptr[COutputStream] stream
268 get_writer(sink, &stream)
269 self._write_to(stream.get())
270
271 cdef _write_to(self, COutputStream* stream):
272 with nogil:
273 check_status(self.data.WriteTo(stream))
274
275 def deserialize(self, SerializationContext context=None):
276 """
277 Convert back to Python object.
278 """
279 cdef PyObject* result
280
281 if context is None:
282 context = _get_default_context()
283
284 with nogil:
285 check_status(DeserializeObject(context, self.data,
286 <PyObject*> self.base, &result))
287
288 # PyObject_to_object is necessary to avoid a memory leak;
289 # also unpack the list the object was wrapped in in serialize
290 return PyObject_to_object(result)[0]
291
292 def to_buffer(self, nthreads=1):
293 """
294 Write serialized data as Buffer.
295 """
296 cdef Buffer output = allocate_buffer(self.total_bytes)
297 sink = FixedSizeBufferWriter(output)
298 if nthreads > 1:
299 sink.set_memcopy_threads(nthreads)
300 self.write_to(sink)
301 return output
302
303 @staticmethod
304 def from_components(components):
305 """
306 Reconstruct SerializedPyObject from output of
307 SerializedPyObject.to_components.
308 """
309 cdef:
310 int num_tensors = components['num_tensors']
311 int num_ndarrays = components['num_ndarrays']
312 int num_buffers = components['num_buffers']
313 list buffers = components['data']
314 SparseTensorCounts num_sparse_tensors = SparseTensorCounts()
315 SerializedPyObject result = SerializedPyObject()
316
317 num_sparse_tensors.coo = components['num_sparse_tensors']['coo']
318 num_sparse_tensors.csr = components['num_sparse_tensors']['csr']
319 num_sparse_tensors.csc = components['num_sparse_tensors']['csc']
320 num_sparse_tensors.csf = components['num_sparse_tensors']['csf']
321 num_sparse_tensors.ndim_csf = \
322 components['num_sparse_tensors']['ndim_csf']
323
324 with nogil:
325 check_status(GetSerializedFromComponents(num_tensors,
326 num_sparse_tensors,
327 num_ndarrays,
328 num_buffers,
329 buffers, &result.data))
330
331 return result
332
333 def to_components(self, memory_pool=None):
334 """
335 Return the decomposed dict representation of the serialized object
336 containing a collection of Buffer objects which maximize opportunities
337 for zero-copy.
338
339 Parameters
340 ----------
341 memory_pool : MemoryPool default None
342 Pool to use for necessary allocations.
343
344 Returns
345
346 """
347 cdef PyObject* result
348 cdef CMemoryPool* c_pool = maybe_unbox_memory_pool(memory_pool)
349
350 with nogil:
351 check_status(self.data.GetComponents(c_pool, &result))
352
353 return PyObject_to_object(result)
354
355
356 def serialize(object value, SerializationContext context=None):
357 """
358 DEPRECATED: Serialize a general Python sequence for transient storage
359 and transport.
360
361 .. deprecated:: 2.0
362 The custom serialization functionality is deprecated in pyarrow 2.0,
363 and will be removed in a future version. Use the standard library
364 ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for
365 more).
366
367 Notes
368 -----
369 This function produces data that is incompatible with the standard
370 Arrow IPC binary protocol, i.e. it cannot be used with ipc.open_stream or
371 ipc.open_file. You can use deserialize, deserialize_from, or
372 deserialize_components to read it.
373
374 Parameters
375 ----------
376 value : object
377 Python object for the sequence that is to be serialized.
378 context : SerializationContext
379 Custom serialization and deserialization context, uses a default
380 context with some standard type handlers if not specified.
381
382 Returns
383 -------
384 serialized : SerializedPyObject
385
386 """
387 _deprecate_serialization("serialize")
388 return _serialize(value, context)
389
390
391 def _serialize(object value, SerializationContext context=None):
392 cdef SerializedPyObject serialized = SerializedPyObject()
393 wrapped_value = [value]
394
395 if context is None:
396 context = _get_default_context()
397
398 with nogil:
399 check_status(SerializeObject(context, wrapped_value, &serialized.data))
400 return serialized
401
402
403 def serialize_to(object value, sink, SerializationContext context=None):
404 """
405 DEPRECATED: Serialize a Python sequence to a file.
406
407 .. deprecated:: 2.0
408 The custom serialization functionality is deprecated in pyarrow 2.0,
409 and will be removed in a future version. Use the standard library
410 ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for
411 more).
412
413 Parameters
414 ----------
415 value : object
416 Python object for the sequence that is to be serialized.
417 sink : NativeFile or file-like
418 File the sequence will be written to.
419 context : SerializationContext
420 Custom serialization and deserialization context, uses a default
421 context with some standard type handlers if not specified.
422 """
423 _deprecate_serialization("serialize_to")
424 serialized = _serialize(value, context)
425 serialized.write_to(sink)
426
427
428 def read_serialized(source, base=None):
429 """
430 DEPRECATED: Read serialized Python sequence from file-like object.
431
432 .. deprecated:: 2.0
433 The custom serialization functionality is deprecated in pyarrow 2.0,
434 and will be removed in a future version. Use the standard library
435 ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for
436 more).
437
438 Parameters
439 ----------
440 source : NativeFile
441 File to read the sequence from.
442 base : object
443 This object will be the base object of all the numpy arrays
444 contained in the sequence.
445
446 Returns
447 -------
448 serialized : the serialized data
449 """
450 _deprecate_serialization("read_serialized")
451 return _read_serialized(source, base=base)
452
453
454 def _read_serialized(source, base=None):
455 cdef shared_ptr[CRandomAccessFile] stream
456 get_reader(source, True, &stream)
457
458 cdef SerializedPyObject serialized = SerializedPyObject()
459 serialized.base = base
460 with nogil:
461 check_status(ReadSerializedObject(stream.get(), &serialized.data))
462
463 return serialized
464
465
466 def deserialize_from(source, object base, SerializationContext context=None):
467 """
468 DEPRECATED: Deserialize a Python sequence from a file.
469
470 .. deprecated:: 2.0
471 The custom serialization functionality is deprecated in pyarrow 2.0,
472 and will be removed in a future version. Use the standard library
473 ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for
474 more).
475
476 This only can interact with data produced by pyarrow.serialize or
477 pyarrow.serialize_to.
478
479 Parameters
480 ----------
481 source : NativeFile
482 File to read the sequence from.
483 base : object
484 This object will be the base object of all the numpy arrays
485 contained in the sequence.
486 context : SerializationContext
487 Custom serialization and deserialization context.
488
489 Returns
490 -------
491 object
492 Python object for the deserialized sequence.
493 """
494 _deprecate_serialization("deserialize_from")
495 serialized = _read_serialized(source, base=base)
496 return serialized.deserialize(context)
497
498
499 def deserialize_components(components, SerializationContext context=None):
500 """
501 DEPRECATED: Reconstruct Python object from output of
502 SerializedPyObject.to_components.
503
504 .. deprecated:: 2.0
505 The custom serialization functionality is deprecated in pyarrow 2.0,
506 and will be removed in a future version. Use the standard library
507 ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for
508 more).
509
510 Parameters
511 ----------
512 components : dict
513 Output of SerializedPyObject.to_components
514 context : SerializationContext, default None
515
516 Returns
517 -------
518 object : the Python object that was originally serialized
519 """
520 _deprecate_serialization("deserialize_components")
521 serialized = SerializedPyObject.from_components(components)
522 return serialized.deserialize(context)
523
524
525 def deserialize(obj, SerializationContext context=None):
526 """
527 DEPRECATED: Deserialize Python object from Buffer or other Python
528 object supporting the buffer protocol.
529
530 .. deprecated:: 2.0
531 The custom serialization functionality is deprecated in pyarrow 2.0,
532 and will be removed in a future version. Use the standard library
533 ``pickle`` or the IPC functionality of pyarrow (see :ref:`ipc` for
534 more).
535
536 This only can interact with data produced by pyarrow.serialize or
537 pyarrow.serialize_to.
538
539 Parameters
540 ----------
541 obj : pyarrow.Buffer or Python object supporting buffer protocol
542 context : SerializationContext
543 Custom serialization and deserialization context.
544
545 Returns
546 -------
547 deserialized : object
548 """
549 _deprecate_serialization("deserialize")
550 return _deserialize(obj, context=context)
551
552
553 def _deserialize(obj, SerializationContext context=None):
554 source = BufferReader(obj)
555 serialized = _read_serialized(source, base=obj)
556 return serialized.deserialize(context)