]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | # Licensed to the Apache Software Foundation (ASF) under one |
2 | # or more contributor license agreements. See the NOTICE file | |
3 | # distributed with this work for additional information | |
4 | # regarding copyright ownership. The ASF licenses this file | |
5 | # to you under the Apache License, Version 2.0 (the | |
6 | # "License"); you may not use this file except in compliance | |
7 | # with the License. You may obtain a copy of the License at | |
8 | # | |
9 | # http://www.apache.org/licenses/LICENSE-2.0 | |
10 | # | |
11 | # Unless required by applicable law or agreed to in writing, | |
12 | # software distributed under the License is distributed on an | |
13 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | # KIND, either express or implied. See the License for the | |
15 | # specific language governing permissions and limitations | |
16 | # under the License. | |
17 | ||
18 | import decimal | |
19 | import io | |
20 | ||
21 | import numpy as np | |
22 | import pytest | |
23 | ||
24 | import pyarrow as pa | |
25 | from pyarrow.tests import util | |
26 | from pyarrow.tests.parquet.common import (_check_roundtrip, | |
27 | parametrize_legacy_dataset) | |
28 | ||
29 | try: | |
30 | import pyarrow.parquet as pq | |
31 | from pyarrow.tests.parquet.common import _read_table, _write_table | |
32 | except ImportError: | |
33 | pq = None | |
34 | ||
35 | ||
36 | try: | |
37 | import pandas as pd | |
38 | import pandas.testing as tm | |
39 | ||
40 | from pyarrow.tests.pandas_examples import (dataframe_with_arrays, | |
41 | dataframe_with_lists) | |
42 | from pyarrow.tests.parquet.common import alltypes_sample | |
43 | except ImportError: | |
44 | pd = tm = None | |
45 | ||
46 | ||
47 | pytestmark = pytest.mark.parquet | |
48 | ||
49 | ||
50 | # General roundtrip of data types | |
51 | # ----------------------------------------------------------------------------- | |
52 | ||
53 | ||
54 | @pytest.mark.pandas | |
55 | @parametrize_legacy_dataset | |
56 | @pytest.mark.parametrize('chunk_size', [None, 1000]) | |
57 | def test_parquet_2_0_roundtrip(tempdir, chunk_size, use_legacy_dataset): | |
58 | df = alltypes_sample(size=10000, categorical=True) | |
59 | ||
60 | filename = tempdir / 'pandas_roundtrip.parquet' | |
61 | arrow_table = pa.Table.from_pandas(df) | |
62 | assert arrow_table.schema.pandas_metadata is not None | |
63 | ||
64 | _write_table(arrow_table, filename, version='2.6', | |
65 | coerce_timestamps='ms', chunk_size=chunk_size) | |
66 | table_read = pq.read_pandas( | |
67 | filename, use_legacy_dataset=use_legacy_dataset) | |
68 | assert table_read.schema.pandas_metadata is not None | |
69 | ||
70 | read_metadata = table_read.schema.metadata | |
71 | assert arrow_table.schema.metadata == read_metadata | |
72 | ||
73 | df_read = table_read.to_pandas() | |
74 | tm.assert_frame_equal(df, df_read) | |
75 | ||
76 | ||
77 | @pytest.mark.pandas | |
78 | @parametrize_legacy_dataset | |
79 | def test_parquet_1_0_roundtrip(tempdir, use_legacy_dataset): | |
80 | size = 10000 | |
81 | np.random.seed(0) | |
82 | df = pd.DataFrame({ | |
83 | 'uint8': np.arange(size, dtype=np.uint8), | |
84 | 'uint16': np.arange(size, dtype=np.uint16), | |
85 | 'uint32': np.arange(size, dtype=np.uint32), | |
86 | 'uint64': np.arange(size, dtype=np.uint64), | |
87 | 'int8': np.arange(size, dtype=np.int16), | |
88 | 'int16': np.arange(size, dtype=np.int16), | |
89 | 'int32': np.arange(size, dtype=np.int32), | |
90 | 'int64': np.arange(size, dtype=np.int64), | |
91 | 'float32': np.arange(size, dtype=np.float32), | |
92 | 'float64': np.arange(size, dtype=np.float64), | |
93 | 'bool': np.random.randn(size) > 0, | |
94 | 'str': [str(x) for x in range(size)], | |
95 | 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None], | |
96 | 'empty_str': [''] * size | |
97 | }) | |
98 | filename = tempdir / 'pandas_roundtrip.parquet' | |
99 | arrow_table = pa.Table.from_pandas(df) | |
100 | _write_table(arrow_table, filename, version='1.0') | |
101 | table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset) | |
102 | df_read = table_read.to_pandas() | |
103 | ||
104 | # We pass uint32_t as int64_t if we write Parquet version 1.0 | |
105 | df['uint32'] = df['uint32'].values.astype(np.int64) | |
106 | ||
107 | tm.assert_frame_equal(df, df_read) | |
108 | ||
109 | ||
110 | # Dictionary | |
111 | # ----------------------------------------------------------------------------- | |
112 | ||
113 | ||
114 | def _simple_table_write_read(table, use_legacy_dataset): | |
115 | bio = pa.BufferOutputStream() | |
116 | pq.write_table(table, bio) | |
117 | contents = bio.getvalue() | |
118 | return pq.read_table( | |
119 | pa.BufferReader(contents), use_legacy_dataset=use_legacy_dataset | |
120 | ) | |
121 | ||
122 | ||
123 | @pytest.mark.pandas | |
124 | @parametrize_legacy_dataset | |
125 | def test_direct_read_dictionary(use_legacy_dataset): | |
126 | # ARROW-3325 | |
127 | repeats = 10 | |
128 | nunique = 5 | |
129 | ||
130 | data = [ | |
131 | [util.rands(10) for i in range(nunique)] * repeats, | |
132 | ||
133 | ] | |
134 | table = pa.table(data, names=['f0']) | |
135 | ||
136 | bio = pa.BufferOutputStream() | |
137 | pq.write_table(table, bio) | |
138 | contents = bio.getvalue() | |
139 | ||
140 | result = pq.read_table(pa.BufferReader(contents), | |
141 | read_dictionary=['f0'], | |
142 | use_legacy_dataset=use_legacy_dataset) | |
143 | ||
144 | # Compute dictionary-encoded subfield | |
145 | expected = pa.table([table[0].dictionary_encode()], names=['f0']) | |
146 | assert result.equals(expected) | |
147 | ||
148 | ||
149 | @pytest.mark.pandas | |
150 | @parametrize_legacy_dataset | |
151 | def test_direct_read_dictionary_subfield(use_legacy_dataset): | |
152 | repeats = 10 | |
153 | nunique = 5 | |
154 | ||
155 | data = [ | |
156 | [[util.rands(10)] for i in range(nunique)] * repeats, | |
157 | ] | |
158 | table = pa.table(data, names=['f0']) | |
159 | ||
160 | bio = pa.BufferOutputStream() | |
161 | pq.write_table(table, bio) | |
162 | contents = bio.getvalue() | |
163 | result = pq.read_table(pa.BufferReader(contents), | |
164 | read_dictionary=['f0.list.item'], | |
165 | use_legacy_dataset=use_legacy_dataset) | |
166 | ||
167 | arr = pa.array(data[0]) | |
168 | values_as_dict = arr.values.dictionary_encode() | |
169 | ||
170 | inner_indices = values_as_dict.indices.cast('int32') | |
171 | new_values = pa.DictionaryArray.from_arrays(inner_indices, | |
172 | values_as_dict.dictionary) | |
173 | ||
174 | offsets = pa.array(range(51), type='int32') | |
175 | expected_arr = pa.ListArray.from_arrays(offsets, new_values) | |
176 | expected = pa.table([expected_arr], names=['f0']) | |
177 | ||
178 | assert result.equals(expected) | |
179 | assert result[0].num_chunks == 1 | |
180 | ||
181 | ||
182 | @parametrize_legacy_dataset | |
183 | def test_dictionary_array_automatically_read(use_legacy_dataset): | |
184 | # ARROW-3246 | |
185 | ||
186 | # Make a large dictionary, a little over 4MB of data | |
187 | dict_length = 4000 | |
188 | dict_values = pa.array([('x' * 1000 + '_{}'.format(i)) | |
189 | for i in range(dict_length)]) | |
190 | ||
191 | num_chunks = 10 | |
192 | chunk_size = 100 | |
193 | chunks = [] | |
194 | for i in range(num_chunks): | |
195 | indices = np.random.randint(0, dict_length, | |
196 | size=chunk_size).astype(np.int32) | |
197 | chunks.append(pa.DictionaryArray.from_arrays(pa.array(indices), | |
198 | dict_values)) | |
199 | ||
200 | table = pa.table([pa.chunked_array(chunks)], names=['f0']) | |
201 | result = _simple_table_write_read(table, use_legacy_dataset) | |
202 | ||
203 | assert result.equals(table) | |
204 | ||
205 | # The only key in the metadata was the Arrow schema key | |
206 | assert result.schema.metadata is None | |
207 | ||
208 | ||
209 | # Decimal | |
210 | # ----------------------------------------------------------------------------- | |
211 | ||
212 | ||
213 | @pytest.mark.pandas | |
214 | @parametrize_legacy_dataset | |
215 | def test_decimal_roundtrip(tempdir, use_legacy_dataset): | |
216 | num_values = 10 | |
217 | ||
218 | columns = {} | |
219 | for precision in range(1, 39): | |
220 | for scale in range(0, precision + 1): | |
221 | with util.random_seed(0): | |
222 | random_decimal_values = [ | |
223 | util.randdecimal(precision, scale) | |
224 | for _ in range(num_values) | |
225 | ] | |
226 | column_name = ('dec_precision_{:d}_scale_{:d}' | |
227 | .format(precision, scale)) | |
228 | columns[column_name] = random_decimal_values | |
229 | ||
230 | expected = pd.DataFrame(columns) | |
231 | filename = tempdir / 'decimals.parquet' | |
232 | string_filename = str(filename) | |
233 | table = pa.Table.from_pandas(expected) | |
234 | _write_table(table, string_filename) | |
235 | result_table = _read_table( | |
236 | string_filename, use_legacy_dataset=use_legacy_dataset) | |
237 | result = result_table.to_pandas() | |
238 | tm.assert_frame_equal(result, expected) | |
239 | ||
240 | ||
241 | @pytest.mark.pandas | |
242 | @pytest.mark.xfail( | |
243 | raises=OSError, reason='Parquet does not support negative scale' | |
244 | ) | |
245 | def test_decimal_roundtrip_negative_scale(tempdir): | |
246 | expected = pd.DataFrame({'decimal_num': [decimal.Decimal('1.23E4')]}) | |
247 | filename = tempdir / 'decimals.parquet' | |
248 | string_filename = str(filename) | |
249 | t = pa.Table.from_pandas(expected) | |
250 | _write_table(t, string_filename) | |
251 | result_table = _read_table(string_filename) | |
252 | result = result_table.to_pandas() | |
253 | tm.assert_frame_equal(result, expected) | |
254 | ||
255 | ||
256 | # List types | |
257 | # ----------------------------------------------------------------------------- | |
258 | ||
259 | ||
260 | @parametrize_legacy_dataset | |
261 | @pytest.mark.parametrize('dtype', [int, float]) | |
262 | def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset): | |
263 | filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__) | |
264 | data = [pa.array(list(map(dtype, range(5))))] | |
265 | table = pa.Table.from_arrays(data, names=['a']) | |
266 | _write_table(table, filename) | |
267 | table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset) | |
268 | for i in range(table.num_columns): | |
269 | col_written = table[i] | |
270 | col_read = table_read[i] | |
271 | assert table.field(i).name == table_read.field(i).name | |
272 | assert col_read.num_chunks == 1 | |
273 | data_written = col_written.chunk(0) | |
274 | data_read = col_read.chunk(0) | |
275 | assert data_written.equals(data_read) | |
276 | ||
277 | ||
278 | @parametrize_legacy_dataset | |
279 | def test_empty_lists_table_roundtrip(use_legacy_dataset): | |
280 | # ARROW-2744: Shouldn't crash when writing an array of empty lists | |
281 | arr = pa.array([[], []], type=pa.list_(pa.int32())) | |
282 | table = pa.Table.from_arrays([arr], ["A"]) | |
283 | _check_roundtrip(table, use_legacy_dataset=use_legacy_dataset) | |
284 | ||
285 | ||
286 | @parametrize_legacy_dataset | |
287 | def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset): | |
288 | # Reproduce failure in ARROW-5630 | |
289 | typ = pa.list_(pa.field("item", pa.float32(), False)) | |
290 | num_rows = 10000 | |
291 | t = pa.table([ | |
292 | pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)] * | |
293 | (num_rows // 10)), type=typ) | |
294 | ], ['a']) | |
295 | _check_roundtrip( | |
296 | t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset) | |
297 | ||
298 | ||
299 | @parametrize_legacy_dataset | |
300 | def test_nested_list_struct_multiple_batches_roundtrip( | |
301 | tempdir, use_legacy_dataset | |
302 | ): | |
303 | # Reproduce failure in ARROW-11024 | |
304 | data = [[{'x': 'abc', 'y': 'abc'}]]*100 + [[{'x': 'abc', 'y': 'gcb'}]]*100 | |
305 | table = pa.table([pa.array(data)], names=['column']) | |
306 | _check_roundtrip( | |
307 | table, row_group_size=20, use_legacy_dataset=use_legacy_dataset) | |
308 | ||
309 | # Reproduce failure in ARROW-11069 (plain non-nested structs with strings) | |
310 | data = pa.array( | |
311 | [{'a': '1', 'b': '2'}, {'a': '3', 'b': '4'}, {'a': '5', 'b': '6'}]*10 | |
312 | ) | |
313 | table = pa.table({'column': data}) | |
314 | _check_roundtrip( | |
315 | table, row_group_size=10, use_legacy_dataset=use_legacy_dataset) | |
316 | ||
317 | ||
318 | def test_writing_empty_lists(): | |
319 | # ARROW-2591: [Python] Segmentation fault issue in pq.write_table | |
320 | arr1 = pa.array([[], []], pa.list_(pa.int32())) | |
321 | table = pa.Table.from_arrays([arr1], ['list(int32)']) | |
322 | _check_roundtrip(table) | |
323 | ||
324 | ||
325 | @pytest.mark.pandas | |
326 | def test_column_of_arrays(tempdir): | |
327 | df, schema = dataframe_with_arrays() | |
328 | ||
329 | filename = tempdir / 'pandas_roundtrip.parquet' | |
330 | arrow_table = pa.Table.from_pandas(df, schema=schema) | |
331 | _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms') | |
332 | table_read = _read_table(filename) | |
333 | df_read = table_read.to_pandas() | |
334 | tm.assert_frame_equal(df, df_read) | |
335 | ||
336 | ||
337 | @pytest.mark.pandas | |
338 | def test_column_of_lists(tempdir): | |
339 | df, schema = dataframe_with_lists(parquet_compatible=True) | |
340 | ||
341 | filename = tempdir / 'pandas_roundtrip.parquet' | |
342 | arrow_table = pa.Table.from_pandas(df, schema=schema) | |
343 | _write_table(arrow_table, filename, version='2.6') | |
344 | table_read = _read_table(filename) | |
345 | df_read = table_read.to_pandas() | |
346 | ||
347 | tm.assert_frame_equal(df, df_read) | |
348 | ||
349 | ||
350 | def test_large_list_records(): | |
351 | # This was fixed in PARQUET-1100 | |
352 | ||
353 | list_lengths = np.random.randint(0, 500, size=50) | |
354 | list_lengths[::10] = 0 | |
355 | ||
356 | list_values = [list(map(int, np.random.randint(0, 100, size=x))) | |
357 | if i % 8 else None | |
358 | for i, x in enumerate(list_lengths)] | |
359 | ||
360 | a1 = pa.array(list_values) | |
361 | ||
362 | table = pa.Table.from_arrays([a1], ['int_lists']) | |
363 | _check_roundtrip(table) | |
364 | ||
365 | ||
366 | @pytest.mark.pandas | |
367 | @parametrize_legacy_dataset | |
368 | def test_parquet_nested_convenience(tempdir, use_legacy_dataset): | |
369 | # ARROW-1684 | |
370 | df = pd.DataFrame({ | |
371 | 'a': [[1, 2, 3], None, [4, 5], []], | |
372 | 'b': [[1.], None, None, [6., 7.]], | |
373 | }) | |
374 | ||
375 | path = str(tempdir / 'nested_convenience.parquet') | |
376 | ||
377 | table = pa.Table.from_pandas(df, preserve_index=False) | |
378 | _write_table(table, path) | |
379 | ||
380 | read = pq.read_table( | |
381 | path, columns=['a'], use_legacy_dataset=use_legacy_dataset) | |
382 | tm.assert_frame_equal(read.to_pandas(), df[['a']]) | |
383 | ||
384 | read = pq.read_table( | |
385 | path, columns=['a', 'b'], use_legacy_dataset=use_legacy_dataset) | |
386 | tm.assert_frame_equal(read.to_pandas(), df) | |
387 | ||
388 | ||
389 | # Binary | |
390 | # ----------------------------------------------------------------------------- | |
391 | ||
392 | ||
393 | def test_fixed_size_binary(): | |
394 | t0 = pa.binary(10) | |
395 | data = [b'fooooooooo', None, b'barooooooo', b'quxooooooo'] | |
396 | a0 = pa.array(data, type=t0) | |
397 | ||
398 | table = pa.Table.from_arrays([a0], | |
399 | ['binary[10]']) | |
400 | _check_roundtrip(table) | |
401 | ||
402 | ||
403 | # Large types | |
404 | # ----------------------------------------------------------------------------- | |
405 | ||
406 | ||
407 | @pytest.mark.slow | |
408 | @pytest.mark.large_memory | |
409 | def test_large_table_int32_overflow(): | |
410 | size = np.iinfo('int32').max + 1 | |
411 | ||
412 | arr = np.ones(size, dtype='uint8') | |
413 | ||
414 | parr = pa.array(arr, type=pa.uint8()) | |
415 | ||
416 | table = pa.Table.from_arrays([parr], names=['one']) | |
417 | f = io.BytesIO() | |
418 | _write_table(table, f) | |
419 | ||
420 | ||
421 | def _simple_table_roundtrip(table, use_legacy_dataset=False, **write_kwargs): | |
422 | stream = pa.BufferOutputStream() | |
423 | _write_table(table, stream, **write_kwargs) | |
424 | buf = stream.getvalue() | |
425 | return _read_table(buf, use_legacy_dataset=use_legacy_dataset) | |
426 | ||
427 | ||
428 | @pytest.mark.slow | |
429 | @pytest.mark.large_memory | |
430 | @parametrize_legacy_dataset | |
431 | def test_byte_array_exactly_2gb(use_legacy_dataset): | |
432 | # Test edge case reported in ARROW-3762 | |
433 | val = b'x' * (1 << 10) | |
434 | ||
435 | base = pa.array([val] * ((1 << 21) - 1)) | |
436 | cases = [ | |
437 | [b'x' * 1023], # 2^31 - 1 | |
438 | [b'x' * 1024], # 2^31 | |
439 | [b'x' * 1025] # 2^31 + 1 | |
440 | ] | |
441 | for case in cases: | |
442 | values = pa.chunked_array([base, pa.array(case)]) | |
443 | t = pa.table([values], names=['f0']) | |
444 | result = _simple_table_roundtrip( | |
445 | t, use_legacy_dataset=use_legacy_dataset, use_dictionary=False) | |
446 | assert t.equals(result) | |
447 | ||
448 | ||
449 | @pytest.mark.slow | |
450 | @pytest.mark.pandas | |
451 | @pytest.mark.large_memory | |
452 | @parametrize_legacy_dataset | |
453 | def test_binary_array_overflow_to_chunked(use_legacy_dataset): | |
454 | # ARROW-3762 | |
455 | ||
456 | # 2^31 + 1 bytes | |
457 | values = [b'x'] + [ | |
458 | b'x' * (1 << 20) | |
459 | ] * 2 * (1 << 10) | |
460 | df = pd.DataFrame({'byte_col': values}) | |
461 | ||
462 | tbl = pa.Table.from_pandas(df, preserve_index=False) | |
463 | read_tbl = _simple_table_roundtrip( | |
464 | tbl, use_legacy_dataset=use_legacy_dataset) | |
465 | ||
466 | col0_data = read_tbl[0] | |
467 | assert isinstance(col0_data, pa.ChunkedArray) | |
468 | ||
469 | # Split up into 2GB chunks | |
470 | assert col0_data.num_chunks == 2 | |
471 | ||
472 | assert tbl.equals(read_tbl) | |
473 | ||
474 | ||
475 | @pytest.mark.slow | |
476 | @pytest.mark.pandas | |
477 | @pytest.mark.large_memory | |
478 | @parametrize_legacy_dataset | |
479 | def test_list_of_binary_large_cell(use_legacy_dataset): | |
480 | # ARROW-4688 | |
481 | data = [] | |
482 | ||
483 | # TODO(wesm): handle chunked children | |
484 | # 2^31 - 1 bytes in a single cell | |
485 | # data.append([b'x' * (1 << 20)] * 2047 + [b'x' * ((1 << 20) - 1)]) | |
486 | ||
487 | # A little under 2GB in cell each containing approximately 10MB each | |
488 | data.extend([[b'x' * 1000000] * 10] * 214) | |
489 | ||
490 | arr = pa.array(data) | |
491 | table = pa.Table.from_arrays([arr], ['chunky_cells']) | |
492 | read_table = _simple_table_roundtrip( | |
493 | table, use_legacy_dataset=use_legacy_dataset) | |
494 | assert table.equals(read_table) | |
495 | ||
496 | ||
497 | def test_large_binary(): | |
498 | data = [b'foo', b'bar'] * 50 | |
499 | for type in [pa.large_binary(), pa.large_string()]: | |
500 | arr = pa.array(data, type=type) | |
501 | table = pa.Table.from_arrays([arr], names=['strs']) | |
502 | for use_dictionary in [False, True]: | |
503 | _check_roundtrip(table, use_dictionary=use_dictionary) | |
504 | ||
505 | ||
506 | @pytest.mark.slow | |
507 | @pytest.mark.large_memory | |
508 | def test_large_binary_huge(): | |
509 | s = b'xy' * 997 | |
510 | data = [s] * ((1 << 33) // len(s)) | |
511 | for type in [pa.large_binary(), pa.large_string()]: | |
512 | arr = pa.array(data, type=type) | |
513 | table = pa.Table.from_arrays([arr], names=['strs']) | |
514 | for use_dictionary in [False, True]: | |
515 | _check_roundtrip(table, use_dictionary=use_dictionary) | |
516 | del arr, table | |
517 | ||
518 | ||
519 | @pytest.mark.large_memory | |
520 | def test_large_binary_overflow(): | |
521 | s = b'x' * (1 << 31) | |
522 | arr = pa.array([s], type=pa.large_binary()) | |
523 | table = pa.Table.from_arrays([arr], names=['strs']) | |
524 | for use_dictionary in [False, True]: | |
525 | writer = pa.BufferOutputStream() | |
526 | with pytest.raises( | |
527 | pa.ArrowInvalid, | |
528 | match="Parquet cannot store strings with size 2GB or more"): | |
529 | _write_table(table, writer, use_dictionary=use_dictionary) |