]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | # Licensed to the Apache Software Foundation (ASF) under one |
2 | # or more contributor license agreements. See the NOTICE file | |
3 | # distributed with this work for additional information | |
4 | # regarding copyright ownership. The ASF licenses this file | |
5 | # to you under the Apache License, Version 2.0 (the | |
6 | # "License"); you may not use this file except in compliance | |
7 | # with the License. You may obtain a copy of the License at | |
8 | # | |
9 | # http://www.apache.org/licenses/LICENSE-2.0 | |
10 | # | |
11 | # Unless required by applicable law or agreed to in writing, | |
12 | # software distributed under the License is distributed on an | |
13 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | # KIND, either express or implied. See the License for the | |
15 | # specific language governing permissions and limitations | |
16 | # under the License. | |
17 | ||
18 | from collections import namedtuple | |
19 | import warnings | |
20 | ||
21 | ||
22 | cpdef enum MetadataVersion: | |
23 | V1 = <char> CMetadataVersion_V1 | |
24 | V2 = <char> CMetadataVersion_V2 | |
25 | V3 = <char> CMetadataVersion_V3 | |
26 | V4 = <char> CMetadataVersion_V4 | |
27 | V5 = <char> CMetadataVersion_V5 | |
28 | ||
29 | ||
30 | cdef object _wrap_metadata_version(CMetadataVersion version): | |
31 | return MetadataVersion(<char> version) | |
32 | ||
33 | ||
34 | cdef CMetadataVersion _unwrap_metadata_version( | |
35 | MetadataVersion version) except *: | |
36 | if version == MetadataVersion.V1: | |
37 | return CMetadataVersion_V1 | |
38 | elif version == MetadataVersion.V2: | |
39 | return CMetadataVersion_V2 | |
40 | elif version == MetadataVersion.V3: | |
41 | return CMetadataVersion_V3 | |
42 | elif version == MetadataVersion.V4: | |
43 | return CMetadataVersion_V4 | |
44 | elif version == MetadataVersion.V5: | |
45 | return CMetadataVersion_V5 | |
46 | raise ValueError("Not a metadata version: " + repr(version)) | |
47 | ||
48 | ||
49 | _WriteStats = namedtuple( | |
50 | 'WriteStats', | |
51 | ('num_messages', 'num_record_batches', 'num_dictionary_batches', | |
52 | 'num_dictionary_deltas', 'num_replaced_dictionaries')) | |
53 | ||
54 | ||
55 | class WriteStats(_WriteStats): | |
56 | """IPC write statistics | |
57 | ||
58 | Parameters | |
59 | ---------- | |
60 | num_messages : number of messages. | |
61 | num_record_batches : number of record batches. | |
62 | num_dictionary_batches : number of dictionary batches. | |
63 | num_dictionary_deltas : delta of dictionaries. | |
64 | num_replaced_dictionaries : number of replaced dictionaries. | |
65 | """ | |
66 | __slots__ = () | |
67 | ||
68 | ||
69 | @staticmethod | |
70 | cdef _wrap_write_stats(CIpcWriteStats c): | |
71 | return WriteStats(c.num_messages, c.num_record_batches, | |
72 | c.num_dictionary_batches, c.num_dictionary_deltas, | |
73 | c.num_replaced_dictionaries) | |
74 | ||
75 | ||
76 | _ReadStats = namedtuple( | |
77 | 'ReadStats', | |
78 | ('num_messages', 'num_record_batches', 'num_dictionary_batches', | |
79 | 'num_dictionary_deltas', 'num_replaced_dictionaries')) | |
80 | ||
81 | ||
82 | class ReadStats(_ReadStats): | |
83 | """IPC read statistics | |
84 | ||
85 | Parameters | |
86 | ---------- | |
87 | num_messages : number of messages. | |
88 | num_record_batches : number of record batches. | |
89 | num_dictionary_batches : number of dictionary batches. | |
90 | num_dictionary_deltas : delta of dictionaries. | |
91 | num_replaced_dictionaries : number of replaced dictionaries. | |
92 | """ | |
93 | __slots__ = () | |
94 | ||
95 | ||
96 | @staticmethod | |
97 | cdef _wrap_read_stats(CIpcReadStats c): | |
98 | return ReadStats(c.num_messages, c.num_record_batches, | |
99 | c.num_dictionary_batches, c.num_dictionary_deltas, | |
100 | c.num_replaced_dictionaries) | |
101 | ||
102 | ||
103 | cdef class IpcWriteOptions(_Weakrefable): | |
104 | """ | |
105 | Serialization options for the IPC format. | |
106 | ||
107 | Parameters | |
108 | ---------- | |
109 | metadata_version : MetadataVersion, default MetadataVersion.V5 | |
110 | The metadata version to write. V5 is the current and latest, | |
111 | V4 is the pre-1.0 metadata version (with incompatible Union layout). | |
112 | allow_64bit : bool, default False | |
113 | If true, allow field lengths that don't fit in a signed 32-bit int. | |
114 | use_legacy_format : bool, default False | |
115 | Whether to use the pre-Arrow 0.15 IPC format. | |
116 | compression : str, Codec, or None | |
117 | compression codec to use for record batch buffers. | |
118 | If None then batch buffers will be uncompressed. | |
119 | Must be "lz4", "zstd" or None. | |
120 | To specify a compression_level use `pyarrow.Codec` | |
121 | use_threads : bool | |
122 | Whether to use the global CPU thread pool to parallelize any | |
123 | computational tasks like compression. | |
124 | emit_dictionary_deltas : bool | |
125 | Whether to emit dictionary deltas. Default is false for maximum | |
126 | stream compatibility. | |
127 | """ | |
128 | __slots__ = () | |
129 | ||
130 | # cdef block is in lib.pxd | |
131 | ||
132 | def __init__(self, *, metadata_version=MetadataVersion.V5, | |
133 | bint allow_64bit=False, use_legacy_format=False, | |
134 | compression=None, bint use_threads=True, | |
135 | bint emit_dictionary_deltas=False): | |
136 | self.c_options = CIpcWriteOptions.Defaults() | |
137 | self.allow_64bit = allow_64bit | |
138 | self.use_legacy_format = use_legacy_format | |
139 | self.metadata_version = metadata_version | |
140 | if compression is not None: | |
141 | self.compression = compression | |
142 | self.use_threads = use_threads | |
143 | self.emit_dictionary_deltas = emit_dictionary_deltas | |
144 | ||
145 | @property | |
146 | def allow_64bit(self): | |
147 | return self.c_options.allow_64bit | |
148 | ||
149 | @allow_64bit.setter | |
150 | def allow_64bit(self, bint value): | |
151 | self.c_options.allow_64bit = value | |
152 | ||
153 | @property | |
154 | def use_legacy_format(self): | |
155 | return self.c_options.write_legacy_ipc_format | |
156 | ||
157 | @use_legacy_format.setter | |
158 | def use_legacy_format(self, bint value): | |
159 | self.c_options.write_legacy_ipc_format = value | |
160 | ||
161 | @property | |
162 | def metadata_version(self): | |
163 | return _wrap_metadata_version(self.c_options.metadata_version) | |
164 | ||
165 | @metadata_version.setter | |
166 | def metadata_version(self, value): | |
167 | self.c_options.metadata_version = _unwrap_metadata_version(value) | |
168 | ||
169 | @property | |
170 | def compression(self): | |
171 | if self.c_options.codec == nullptr: | |
172 | return None | |
173 | else: | |
174 | return frombytes(self.c_options.codec.get().name()) | |
175 | ||
176 | @compression.setter | |
177 | def compression(self, value): | |
178 | if value is None: | |
179 | self.c_options.codec.reset() | |
180 | elif isinstance(value, str): | |
181 | self.c_options.codec = shared_ptr[CCodec](GetResultValue( | |
182 | CCodec.Create(_ensure_compression(value))).release()) | |
183 | elif isinstance(value, Codec): | |
184 | self.c_options.codec = (<Codec>value).wrapped | |
185 | else: | |
186 | raise TypeError( | |
187 | "Property `compression` must be None, str, or pyarrow.Codec") | |
188 | ||
189 | @property | |
190 | def use_threads(self): | |
191 | return self.c_options.use_threads | |
192 | ||
193 | @use_threads.setter | |
194 | def use_threads(self, bint value): | |
195 | self.c_options.use_threads = value | |
196 | ||
197 | @property | |
198 | def emit_dictionary_deltas(self): | |
199 | return self.c_options.emit_dictionary_deltas | |
200 | ||
201 | @emit_dictionary_deltas.setter | |
202 | def emit_dictionary_deltas(self, bint value): | |
203 | self.c_options.emit_dictionary_deltas = value | |
204 | ||
205 | ||
206 | cdef class Message(_Weakrefable): | |
207 | """ | |
208 | Container for an Arrow IPC message with metadata and optional body | |
209 | """ | |
210 | ||
211 | def __cinit__(self): | |
212 | pass | |
213 | ||
214 | def __init__(self): | |
215 | raise TypeError("Do not call {}'s constructor directly, use " | |
216 | "`pyarrow.ipc.read_message` function instead." | |
217 | .format(self.__class__.__name__)) | |
218 | ||
219 | @property | |
220 | def type(self): | |
221 | return frombytes(FormatMessageType(self.message.get().type())) | |
222 | ||
223 | @property | |
224 | def metadata(self): | |
225 | return pyarrow_wrap_buffer(self.message.get().metadata()) | |
226 | ||
227 | @property | |
228 | def metadata_version(self): | |
229 | return _wrap_metadata_version(self.message.get().metadata_version()) | |
230 | ||
231 | @property | |
232 | def body(self): | |
233 | cdef shared_ptr[CBuffer] body = self.message.get().body() | |
234 | if body.get() == NULL: | |
235 | return None | |
236 | else: | |
237 | return pyarrow_wrap_buffer(body) | |
238 | ||
239 | def equals(self, Message other): | |
240 | """ | |
241 | Returns True if the message contents (metadata and body) are identical | |
242 | ||
243 | Parameters | |
244 | ---------- | |
245 | other : Message | |
246 | ||
247 | Returns | |
248 | ------- | |
249 | are_equal : bool | |
250 | """ | |
251 | cdef c_bool result | |
252 | with nogil: | |
253 | result = self.message.get().Equals(deref(other.message.get())) | |
254 | return result | |
255 | ||
256 | def serialize_to(self, NativeFile sink, alignment=8, memory_pool=None): | |
257 | """ | |
258 | Write message to generic OutputStream | |
259 | ||
260 | Parameters | |
261 | ---------- | |
262 | sink : NativeFile | |
263 | alignment : int, default 8 | |
264 | Byte alignment for metadata and body | |
265 | memory_pool : MemoryPool, default None | |
266 | Uses default memory pool if not specified | |
267 | """ | |
268 | cdef: | |
269 | int64_t output_length = 0 | |
270 | COutputStream* out | |
271 | CIpcWriteOptions options | |
272 | ||
273 | options.alignment = alignment | |
274 | out = sink.get_output_stream().get() | |
275 | with nogil: | |
276 | check_status(self.message.get() | |
277 | .SerializeTo(out, options, &output_length)) | |
278 | ||
279 | def serialize(self, alignment=8, memory_pool=None): | |
280 | """ | |
281 | Write message as encapsulated IPC message | |
282 | ||
283 | Parameters | |
284 | ---------- | |
285 | alignment : int, default 8 | |
286 | Byte alignment for metadata and body | |
287 | memory_pool : MemoryPool, default None | |
288 | Uses default memory pool if not specified | |
289 | ||
290 | Returns | |
291 | ------- | |
292 | serialized : Buffer | |
293 | """ | |
294 | stream = BufferOutputStream(memory_pool) | |
295 | self.serialize_to(stream, alignment=alignment, memory_pool=memory_pool) | |
296 | return stream.getvalue() | |
297 | ||
298 | def __repr__(self): | |
299 | if self.message == nullptr: | |
300 | return """pyarrow.Message(uninitialized)""" | |
301 | ||
302 | metadata_len = self.metadata.size | |
303 | body = self.body | |
304 | body_len = 0 if body is None else body.size | |
305 | ||
306 | return """pyarrow.Message | |
307 | type: {0} | |
308 | metadata length: {1} | |
309 | body length: {2}""".format(self.type, metadata_len, body_len) | |
310 | ||
311 | ||
312 | cdef class MessageReader(_Weakrefable): | |
313 | """ | |
314 | Interface for reading Message objects from some source (like an | |
315 | InputStream) | |
316 | """ | |
317 | cdef: | |
318 | unique_ptr[CMessageReader] reader | |
319 | ||
320 | def __cinit__(self): | |
321 | pass | |
322 | ||
323 | def __init__(self): | |
324 | raise TypeError("Do not call {}'s constructor directly, use " | |
325 | "`pyarrow.ipc.MessageReader.open_stream` function " | |
326 | "instead.".format(self.__class__.__name__)) | |
327 | ||
328 | @staticmethod | |
329 | def open_stream(source): | |
330 | """ | |
331 | Open stream from source. | |
332 | ||
333 | Parameters | |
334 | ---------- | |
335 | source : a readable source, like an InputStream | |
336 | """ | |
337 | cdef: | |
338 | MessageReader result = MessageReader.__new__(MessageReader) | |
339 | shared_ptr[CInputStream] in_stream | |
340 | unique_ptr[CMessageReader] reader | |
341 | ||
342 | _get_input_stream(source, &in_stream) | |
343 | with nogil: | |
344 | reader = CMessageReader.Open(in_stream) | |
345 | result.reader.reset(reader.release()) | |
346 | ||
347 | return result | |
348 | ||
349 | def __iter__(self): | |
350 | while True: | |
351 | yield self.read_next_message() | |
352 | ||
353 | def read_next_message(self): | |
354 | """ | |
355 | Read next Message from the stream. | |
356 | ||
357 | Raises | |
358 | ------ | |
359 | StopIteration : at end of stream | |
360 | """ | |
361 | cdef Message result = Message.__new__(Message) | |
362 | ||
363 | with nogil: | |
364 | result.message = move(GetResultValue(self.reader.get() | |
365 | .ReadNextMessage())) | |
366 | ||
367 | if result.message.get() == NULL: | |
368 | raise StopIteration | |
369 | ||
370 | return result | |
371 | ||
372 | # ---------------------------------------------------------------------- | |
373 | # File and stream readers and writers | |
374 | ||
375 | cdef class _CRecordBatchWriter(_Weakrefable): | |
376 | """The base RecordBatchWriter wrapper. | |
377 | ||
378 | Provides common implementations of convenience methods. Should not | |
379 | be instantiated directly by user code. | |
380 | """ | |
381 | ||
382 | # cdef block is in lib.pxd | |
383 | ||
384 | def write(self, table_or_batch): | |
385 | """ | |
386 | Write RecordBatch or Table to stream. | |
387 | ||
388 | Parameters | |
389 | ---------- | |
390 | table_or_batch : {RecordBatch, Table} | |
391 | """ | |
392 | if isinstance(table_or_batch, RecordBatch): | |
393 | self.write_batch(table_or_batch) | |
394 | elif isinstance(table_or_batch, Table): | |
395 | self.write_table(table_or_batch) | |
396 | else: | |
397 | raise ValueError(type(table_or_batch)) | |
398 | ||
399 | def write_batch(self, RecordBatch batch): | |
400 | """ | |
401 | Write RecordBatch to stream. | |
402 | ||
403 | Parameters | |
404 | ---------- | |
405 | batch : RecordBatch | |
406 | """ | |
407 | with nogil: | |
408 | check_status(self.writer.get() | |
409 | .WriteRecordBatch(deref(batch.batch))) | |
410 | ||
411 | def write_table(self, Table table, max_chunksize=None, **kwargs): | |
412 | """ | |
413 | Write Table to stream in (contiguous) RecordBatch objects. | |
414 | ||
415 | Parameters | |
416 | ---------- | |
417 | table : Table | |
418 | max_chunksize : int, default None | |
419 | Maximum size for RecordBatch chunks. Individual chunks may be | |
420 | smaller depending on the chunk layout of individual columns. | |
421 | """ | |
422 | cdef: | |
423 | # max_chunksize must be > 0 to have any impact | |
424 | int64_t c_max_chunksize = -1 | |
425 | ||
426 | if 'chunksize' in kwargs: | |
427 | max_chunksize = kwargs['chunksize'] | |
428 | msg = ('The parameter chunksize is deprecated for the write_table ' | |
429 | 'methods as of 0.15, please use parameter ' | |
430 | 'max_chunksize instead') | |
431 | warnings.warn(msg, FutureWarning) | |
432 | ||
433 | if max_chunksize is not None: | |
434 | c_max_chunksize = max_chunksize | |
435 | ||
436 | with nogil: | |
437 | check_status(self.writer.get().WriteTable(table.table[0], | |
438 | c_max_chunksize)) | |
439 | ||
440 | def close(self): | |
441 | """ | |
442 | Close stream and write end-of-stream 0 marker. | |
443 | """ | |
444 | with nogil: | |
445 | check_status(self.writer.get().Close()) | |
446 | ||
447 | def __enter__(self): | |
448 | return self | |
449 | ||
450 | def __exit__(self, exc_type, exc_val, exc_tb): | |
451 | self.close() | |
452 | ||
453 | @property | |
454 | def stats(self): | |
455 | """ | |
456 | Current IPC write statistics. | |
457 | """ | |
458 | if not self.writer: | |
459 | raise ValueError("Operation on closed writer") | |
460 | return _wrap_write_stats(self.writer.get().stats()) | |
461 | ||
462 | ||
463 | cdef class _RecordBatchStreamWriter(_CRecordBatchWriter): | |
464 | cdef: | |
465 | CIpcWriteOptions options | |
466 | bint closed | |
467 | ||
468 | def __cinit__(self): | |
469 | pass | |
470 | ||
471 | def __dealloc__(self): | |
472 | pass | |
473 | ||
474 | @property | |
475 | def _use_legacy_format(self): | |
476 | # For testing (see test_ipc.py) | |
477 | return self.options.write_legacy_ipc_format | |
478 | ||
479 | @property | |
480 | def _metadata_version(self): | |
481 | # For testing (see test_ipc.py) | |
482 | return _wrap_metadata_version(self.options.metadata_version) | |
483 | ||
484 | def _open(self, sink, Schema schema not None, | |
485 | IpcWriteOptions options=IpcWriteOptions()): | |
486 | cdef: | |
487 | shared_ptr[COutputStream] c_sink | |
488 | ||
489 | self.options = options.c_options | |
490 | get_writer(sink, &c_sink) | |
491 | with nogil: | |
492 | self.writer = GetResultValue( | |
493 | MakeStreamWriter(c_sink, schema.sp_schema, | |
494 | self.options)) | |
495 | ||
496 | ||
497 | cdef _get_input_stream(object source, shared_ptr[CInputStream]* out): | |
498 | try: | |
499 | source = as_buffer(source) | |
500 | except TypeError: | |
501 | # Non-buffer-like | |
502 | pass | |
503 | ||
504 | get_input_stream(source, True, out) | |
505 | ||
506 | ||
507 | class _ReadPandasMixin: | |
508 | ||
509 | def read_pandas(self, **options): | |
510 | """ | |
511 | Read contents of stream to a pandas.DataFrame. | |
512 | ||
513 | Read all record batches as a pyarrow.Table then convert it to a | |
514 | pandas.DataFrame using Table.to_pandas. | |
515 | ||
516 | Parameters | |
517 | ---------- | |
518 | **options : arguments to forward to Table.to_pandas | |
519 | ||
520 | Returns | |
521 | ------- | |
522 | df : pandas.DataFrame | |
523 | """ | |
524 | table = self.read_all() | |
525 | return table.to_pandas(**options) | |
526 | ||
527 | ||
528 | cdef class RecordBatchReader(_Weakrefable): | |
529 | """Base class for reading stream of record batches. | |
530 | ||
531 | Provides common implementations of convenience methods. Should not | |
532 | be instantiated directly by user code. | |
533 | """ | |
534 | ||
535 | # cdef block is in lib.pxd | |
536 | ||
537 | def __iter__(self): | |
538 | while True: | |
539 | try: | |
540 | yield self.read_next_batch() | |
541 | except StopIteration: | |
542 | return | |
543 | ||
544 | @property | |
545 | def schema(self): | |
546 | """ | |
547 | Shared schema of the record batches in the stream. | |
548 | """ | |
549 | cdef shared_ptr[CSchema] c_schema | |
550 | ||
551 | with nogil: | |
552 | c_schema = self.reader.get().schema() | |
553 | ||
554 | return pyarrow_wrap_schema(c_schema) | |
555 | ||
556 | def get_next_batch(self): | |
557 | import warnings | |
558 | warnings.warn('Please use read_next_batch instead of ' | |
559 | 'get_next_batch', FutureWarning) | |
560 | return self.read_next_batch() | |
561 | ||
562 | def read_next_batch(self): | |
563 | """ | |
564 | Read next RecordBatch from the stream. | |
565 | ||
566 | Raises | |
567 | ------ | |
568 | StopIteration: | |
569 | At end of stream. | |
570 | """ | |
571 | cdef shared_ptr[CRecordBatch] batch | |
572 | ||
573 | with nogil: | |
574 | check_status(self.reader.get().ReadNext(&batch)) | |
575 | ||
576 | if batch.get() == NULL: | |
577 | raise StopIteration | |
578 | ||
579 | return pyarrow_wrap_batch(batch) | |
580 | ||
581 | def read_all(self): | |
582 | """ | |
583 | Read all record batches as a pyarrow.Table. | |
584 | """ | |
585 | cdef shared_ptr[CTable] table | |
586 | with nogil: | |
587 | check_status(self.reader.get().ReadAll(&table)) | |
588 | return pyarrow_wrap_table(table) | |
589 | ||
590 | read_pandas = _ReadPandasMixin.read_pandas | |
591 | ||
592 | def __enter__(self): | |
593 | return self | |
594 | ||
595 | def __exit__(self, exc_type, exc_val, exc_tb): | |
596 | pass | |
597 | ||
598 | def _export_to_c(self, uintptr_t out_ptr): | |
599 | """ | |
600 | Export to a C ArrowArrayStream struct, given its pointer. | |
601 | ||
602 | Parameters | |
603 | ---------- | |
604 | out_ptr: int | |
605 | The raw pointer to a C ArrowArrayStream struct. | |
606 | ||
607 | Be careful: if you don't pass the ArrowArrayStream struct to a | |
608 | consumer, array memory will leak. This is a low-level function | |
609 | intended for expert users. | |
610 | """ | |
611 | with nogil: | |
612 | check_status(ExportRecordBatchReader( | |
613 | self.reader, <ArrowArrayStream*> out_ptr)) | |
614 | ||
615 | @staticmethod | |
616 | def _import_from_c(uintptr_t in_ptr): | |
617 | """ | |
618 | Import RecordBatchReader from a C ArrowArrayStream struct, | |
619 | given its pointer. | |
620 | ||
621 | Parameters | |
622 | ---------- | |
623 | in_ptr: int | |
624 | The raw pointer to a C ArrowArrayStream struct. | |
625 | ||
626 | This is a low-level function intended for expert users. | |
627 | """ | |
628 | cdef: | |
629 | shared_ptr[CRecordBatchReader] c_reader | |
630 | RecordBatchReader self | |
631 | ||
632 | with nogil: | |
633 | c_reader = GetResultValue(ImportRecordBatchReader( | |
634 | <ArrowArrayStream*> in_ptr)) | |
635 | ||
636 | self = RecordBatchReader.__new__(RecordBatchReader) | |
637 | self.reader = c_reader | |
638 | return self | |
639 | ||
640 | @staticmethod | |
641 | def from_batches(schema, batches): | |
642 | """ | |
643 | Create RecordBatchReader from an iterable of batches. | |
644 | ||
645 | Parameters | |
646 | ---------- | |
647 | schema : Schema | |
648 | The shared schema of the record batches | |
649 | batches : Iterable[RecordBatch] | |
650 | The batches that this reader will return. | |
651 | ||
652 | Returns | |
653 | ------- | |
654 | reader : RecordBatchReader | |
655 | """ | |
656 | cdef: | |
657 | shared_ptr[CSchema] c_schema | |
658 | shared_ptr[CRecordBatchReader] c_reader | |
659 | RecordBatchReader self | |
660 | ||
661 | c_schema = pyarrow_unwrap_schema(schema) | |
662 | c_reader = GetResultValue(CPyRecordBatchReader.Make( | |
663 | c_schema, batches)) | |
664 | ||
665 | self = RecordBatchReader.__new__(RecordBatchReader) | |
666 | self.reader = c_reader | |
667 | return self | |
668 | ||
669 | ||
670 | cdef class _RecordBatchStreamReader(RecordBatchReader): | |
671 | cdef: | |
672 | shared_ptr[CInputStream] in_stream | |
673 | CIpcReadOptions options | |
674 | CRecordBatchStreamReader* stream_reader | |
675 | ||
676 | def __cinit__(self): | |
677 | pass | |
678 | ||
679 | def _open(self, source): | |
680 | _get_input_stream(source, &self.in_stream) | |
681 | with nogil: | |
682 | self.reader = GetResultValue(CRecordBatchStreamReader.Open( | |
683 | self.in_stream, self.options)) | |
684 | self.stream_reader = <CRecordBatchStreamReader*> self.reader.get() | |
685 | ||
686 | @property | |
687 | def stats(self): | |
688 | """ | |
689 | Current IPC read statistics. | |
690 | """ | |
691 | if not self.reader: | |
692 | raise ValueError("Operation on closed reader") | |
693 | return _wrap_read_stats(self.stream_reader.stats()) | |
694 | ||
695 | ||
696 | cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter): | |
697 | ||
698 | def _open(self, sink, Schema schema not None, | |
699 | IpcWriteOptions options=IpcWriteOptions()): | |
700 | cdef: | |
701 | shared_ptr[COutputStream] c_sink | |
702 | ||
703 | self.options = options.c_options | |
704 | get_writer(sink, &c_sink) | |
705 | with nogil: | |
706 | self.writer = GetResultValue( | |
707 | MakeFileWriter(c_sink, schema.sp_schema, self.options)) | |
708 | ||
709 | ||
710 | cdef class _RecordBatchFileReader(_Weakrefable): | |
711 | cdef: | |
712 | shared_ptr[CRecordBatchFileReader] reader | |
713 | shared_ptr[CRandomAccessFile] file | |
714 | CIpcReadOptions options | |
715 | ||
716 | cdef readonly: | |
717 | Schema schema | |
718 | ||
719 | def __cinit__(self): | |
720 | pass | |
721 | ||
722 | def _open(self, source, footer_offset=None): | |
723 | try: | |
724 | source = as_buffer(source) | |
725 | except TypeError: | |
726 | pass | |
727 | ||
728 | get_reader(source, True, &self.file) | |
729 | ||
730 | cdef int64_t offset = 0 | |
731 | if footer_offset is not None: | |
732 | offset = footer_offset | |
733 | ||
734 | with nogil: | |
735 | if offset != 0: | |
736 | self.reader = GetResultValue( | |
737 | CRecordBatchFileReader.Open2(self.file.get(), offset, | |
738 | self.options)) | |
739 | ||
740 | else: | |
741 | self.reader = GetResultValue( | |
742 | CRecordBatchFileReader.Open(self.file.get(), | |
743 | self.options)) | |
744 | ||
745 | self.schema = pyarrow_wrap_schema(self.reader.get().schema()) | |
746 | ||
747 | @property | |
748 | def num_record_batches(self): | |
749 | return self.reader.get().num_record_batches() | |
750 | ||
751 | def get_batch(self, int i): | |
752 | cdef shared_ptr[CRecordBatch] batch | |
753 | ||
754 | if i < 0 or i >= self.num_record_batches: | |
755 | raise ValueError('Batch number {0} out of range'.format(i)) | |
756 | ||
757 | with nogil: | |
758 | batch = GetResultValue(self.reader.get().ReadRecordBatch(i)) | |
759 | ||
760 | return pyarrow_wrap_batch(batch) | |
761 | ||
762 | # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of | |
763 | # time has passed | |
764 | get_record_batch = get_batch | |
765 | ||
766 | def read_all(self): | |
767 | """ | |
768 | Read all record batches as a pyarrow.Table | |
769 | """ | |
770 | cdef: | |
771 | vector[shared_ptr[CRecordBatch]] batches | |
772 | shared_ptr[CTable] table | |
773 | int i, nbatches | |
774 | ||
775 | nbatches = self.num_record_batches | |
776 | ||
777 | batches.resize(nbatches) | |
778 | with nogil: | |
779 | for i in range(nbatches): | |
780 | batches[i] = GetResultValue(self.reader.get() | |
781 | .ReadRecordBatch(i)) | |
782 | table = GetResultValue( | |
783 | CTable.FromRecordBatches(self.schema.sp_schema, move(batches))) | |
784 | ||
785 | return pyarrow_wrap_table(table) | |
786 | ||
787 | read_pandas = _ReadPandasMixin.read_pandas | |
788 | ||
789 | def __enter__(self): | |
790 | return self | |
791 | ||
792 | def __exit__(self, exc_type, exc_value, traceback): | |
793 | pass | |
794 | ||
795 | @property | |
796 | def stats(self): | |
797 | """ | |
798 | Current IPC read statistics. | |
799 | """ | |
800 | if not self.reader: | |
801 | raise ValueError("Operation on closed reader") | |
802 | return _wrap_read_stats(self.reader.get().stats()) | |
803 | ||
804 | ||
805 | def get_tensor_size(Tensor tensor): | |
806 | """ | |
807 | Return total size of serialized Tensor including metadata and padding. | |
808 | ||
809 | Parameters | |
810 | ---------- | |
811 | tensor : Tensor | |
812 | The tensor for which we want to known the size. | |
813 | """ | |
814 | cdef int64_t size | |
815 | with nogil: | |
816 | check_status(GetTensorSize(deref(tensor.tp), &size)) | |
817 | return size | |
818 | ||
819 | ||
820 | def get_record_batch_size(RecordBatch batch): | |
821 | """ | |
822 | Return total size of serialized RecordBatch including metadata and padding. | |
823 | ||
824 | Parameters | |
825 | ---------- | |
826 | batch : RecordBatch | |
827 | The recordbatch for which we want to know the size. | |
828 | """ | |
829 | cdef int64_t size | |
830 | with nogil: | |
831 | check_status(GetRecordBatchSize(deref(batch.batch), &size)) | |
832 | return size | |
833 | ||
834 | ||
835 | def write_tensor(Tensor tensor, NativeFile dest): | |
836 | """ | |
837 | Write pyarrow.Tensor to pyarrow.NativeFile object its current position. | |
838 | ||
839 | Parameters | |
840 | ---------- | |
841 | tensor : pyarrow.Tensor | |
842 | dest : pyarrow.NativeFile | |
843 | ||
844 | Returns | |
845 | ------- | |
846 | bytes_written : int | |
847 | Total number of bytes written to the file | |
848 | """ | |
849 | cdef: | |
850 | int32_t metadata_length | |
851 | int64_t body_length | |
852 | ||
853 | handle = dest.get_output_stream() | |
854 | ||
855 | with nogil: | |
856 | check_status( | |
857 | WriteTensor(deref(tensor.tp), handle.get(), | |
858 | &metadata_length, &body_length)) | |
859 | ||
860 | return metadata_length + body_length | |
861 | ||
862 | ||
863 | cdef NativeFile as_native_file(source): | |
864 | if not isinstance(source, NativeFile): | |
865 | if hasattr(source, 'read'): | |
866 | source = PythonFile(source) | |
867 | else: | |
868 | source = BufferReader(source) | |
869 | ||
870 | if not isinstance(source, NativeFile): | |
871 | raise ValueError('Unable to read message from object with type: {0}' | |
872 | .format(type(source))) | |
873 | return source | |
874 | ||
875 | ||
876 | def read_tensor(source): | |
877 | """Read pyarrow.Tensor from pyarrow.NativeFile object from current | |
878 | position. If the file source supports zero copy (e.g. a memory map), then | |
879 | this operation does not allocate any memory. This function not assume that | |
880 | the stream is aligned | |
881 | ||
882 | Parameters | |
883 | ---------- | |
884 | source : pyarrow.NativeFile | |
885 | ||
886 | Returns | |
887 | ------- | |
888 | tensor : Tensor | |
889 | ||
890 | """ | |
891 | cdef: | |
892 | shared_ptr[CTensor] sp_tensor | |
893 | CInputStream* c_stream | |
894 | NativeFile nf = as_native_file(source) | |
895 | ||
896 | c_stream = nf.get_input_stream().get() | |
897 | with nogil: | |
898 | sp_tensor = GetResultValue(ReadTensor(c_stream)) | |
899 | return pyarrow_wrap_tensor(sp_tensor) | |
900 | ||
901 | ||
902 | def read_message(source): | |
903 | """ | |
904 | Read length-prefixed message from file or buffer-like object | |
905 | ||
906 | Parameters | |
907 | ---------- | |
908 | source : pyarrow.NativeFile, file-like object, or buffer-like object | |
909 | ||
910 | Returns | |
911 | ------- | |
912 | message : Message | |
913 | """ | |
914 | cdef: | |
915 | Message result = Message.__new__(Message) | |
916 | CInputStream* c_stream | |
917 | ||
918 | cdef NativeFile nf = as_native_file(source) | |
919 | c_stream = nf.get_input_stream().get() | |
920 | ||
921 | with nogil: | |
922 | result.message = move( | |
923 | GetResultValue(ReadMessage(c_stream, c_default_memory_pool()))) | |
924 | ||
925 | if result.message == nullptr: | |
926 | raise EOFError("End of Arrow stream") | |
927 | ||
928 | return result | |
929 | ||
930 | ||
931 | def read_schema(obj, DictionaryMemo dictionary_memo=None): | |
932 | """ | |
933 | Read Schema from message or buffer | |
934 | ||
935 | Parameters | |
936 | ---------- | |
937 | obj : buffer or Message | |
938 | dictionary_memo : DictionaryMemo, optional | |
939 | Needed to be able to reconstruct dictionary-encoded fields | |
940 | with read_record_batch | |
941 | ||
942 | Returns | |
943 | ------- | |
944 | schema : Schema | |
945 | """ | |
946 | cdef: | |
947 | shared_ptr[CSchema] result | |
948 | shared_ptr[CRandomAccessFile] cpp_file | |
949 | CDictionaryMemo temp_memo | |
950 | CDictionaryMemo* arg_dict_memo | |
951 | ||
952 | if isinstance(obj, Message): | |
953 | raise NotImplementedError(type(obj)) | |
954 | ||
955 | get_reader(obj, True, &cpp_file) | |
956 | ||
957 | if dictionary_memo is not None: | |
958 | arg_dict_memo = dictionary_memo.memo | |
959 | else: | |
960 | arg_dict_memo = &temp_memo | |
961 | ||
962 | with nogil: | |
963 | result = GetResultValue(ReadSchema(cpp_file.get(), arg_dict_memo)) | |
964 | ||
965 | return pyarrow_wrap_schema(result) | |
966 | ||
967 | ||
968 | def read_record_batch(obj, Schema schema, | |
969 | DictionaryMemo dictionary_memo=None): | |
970 | """ | |
971 | Read RecordBatch from message, given a known schema. If reading data from a | |
972 | complete IPC stream, use ipc.open_stream instead | |
973 | ||
974 | Parameters | |
975 | ---------- | |
976 | obj : Message or Buffer-like | |
977 | schema : Schema | |
978 | dictionary_memo : DictionaryMemo, optional | |
979 | If message contains dictionaries, must pass a populated | |
980 | DictionaryMemo | |
981 | ||
982 | Returns | |
983 | ------- | |
984 | batch : RecordBatch | |
985 | """ | |
986 | cdef: | |
987 | shared_ptr[CRecordBatch] result | |
988 | Message message | |
989 | CDictionaryMemo temp_memo | |
990 | CDictionaryMemo* arg_dict_memo | |
991 | ||
992 | if isinstance(obj, Message): | |
993 | message = obj | |
994 | else: | |
995 | message = read_message(obj) | |
996 | ||
997 | if dictionary_memo is not None: | |
998 | arg_dict_memo = dictionary_memo.memo | |
999 | else: | |
1000 | arg_dict_memo = &temp_memo | |
1001 | ||
1002 | with nogil: | |
1003 | result = GetResultValue( | |
1004 | ReadRecordBatch(deref(message.message.get()), | |
1005 | schema.sp_schema, | |
1006 | arg_dict_memo, | |
1007 | CIpcReadOptions.Defaults())) | |
1008 | ||
1009 | return pyarrow_wrap_batch(result) |