1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
25 from pyarrow
.tests
.parquet
.common
import (
26 _check_roundtrip
, parametrize_legacy_dataset
)
29 import pyarrow
.parquet
as pq
30 from pyarrow
.tests
.parquet
.common
import _read_table
, _write_table
37 import pandas
.testing
as tm
39 from pyarrow
.tests
.parquet
.common
import _roundtrip_pandas_dataframe
44 pytestmark
= pytest
.mark
.parquet
48 @parametrize_legacy_dataset
49 def test_pandas_parquet_datetime_tz(use_legacy_dataset
):
50 s
= pd
.Series([datetime
.datetime(2017, 9, 6)])
51 s
= s
.dt
.tz_localize('utc')
55 # Both a column and an index to hit both use cases
56 df
= pd
.DataFrame({'tz_aware': s
,
57 'tz_eastern': s
.dt
.tz_convert('US/Eastern')},
62 arrow_table
= pa
.Table
.from_pandas(df
)
64 _write_table(arrow_table
, f
, coerce_timestamps
='ms')
67 table_read
= pq
.read_pandas(f
, use_legacy_dataset
=use_legacy_dataset
)
69 df_read
= table_read
.to_pandas()
70 tm
.assert_frame_equal(df
, df_read
)
74 @parametrize_legacy_dataset
75 def test_datetime_timezone_tzinfo(use_legacy_dataset
):
76 value
= datetime
.datetime(2018, 1, 1, 1, 23, 45,
77 tzinfo
=datetime
.timezone
.utc
)
78 df
= pd
.DataFrame({'foo': [value
]})
80 _roundtrip_pandas_dataframe(
81 df
, write_kwargs
={}, use_legacy_dataset
=use_legacy_dataset
)
85 def test_coerce_timestamps(tempdir
):
86 from collections
import OrderedDict
89 arrays
= OrderedDict()
90 fields
= [pa
.field('datetime64',
91 pa
.list_(pa
.timestamp('ms')))]
92 arrays
['datetime64'] = [
93 np
.array(['2007-07-13T01:23:34.123456789',
95 '2010-08-13T05:46:57.437699912'],
96 dtype
='datetime64[ms]'),
99 np
.array(['2007-07-13T02',
101 '2010-08-13T05:46:57.437699912'],
102 dtype
='datetime64[ms]'),
105 df
= pd
.DataFrame(arrays
)
106 schema
= pa
.schema(fields
)
108 filename
= tempdir
/ 'pandas_roundtrip.parquet'
109 arrow_table
= pa
.Table
.from_pandas(df
, schema
=schema
)
111 _write_table(arrow_table
, filename
, version
='2.6', coerce_timestamps
='us')
112 table_read
= _read_table(filename
)
113 df_read
= table_read
.to_pandas()
115 df_expected
= df
.copy()
116 for i
, x
in enumerate(df_expected
['datetime64']):
117 if isinstance(x
, np
.ndarray
):
118 df_expected
['datetime64'][i
] = x
.astype('M8[us]')
120 tm
.assert_frame_equal(df_expected
, df_read
)
122 with pytest
.raises(ValueError):
123 _write_table(arrow_table
, filename
, version
='2.6',
124 coerce_timestamps
='unknown')
128 def test_coerce_timestamps_truncated(tempdir
):
130 ARROW-2555: Test that we can truncate timestamps when coercing if
133 dt_us
= datetime
.datetime(year
=2017, month
=1, day
=1, hour
=1, minute
=1,
134 second
=1, microsecond
=1)
135 dt_ms
= datetime
.datetime(year
=2017, month
=1, day
=1, hour
=1, minute
=1,
138 fields_us
= [pa
.field('datetime64', pa
.timestamp('us'))]
139 arrays_us
= {'datetime64': [dt_us
, dt_ms
]}
141 df_us
= pd
.DataFrame(arrays_us
)
142 schema_us
= pa
.schema(fields_us
)
144 filename
= tempdir
/ 'pandas_truncated.parquet'
145 table_us
= pa
.Table
.from_pandas(df_us
, schema
=schema_us
)
147 _write_table(table_us
, filename
, version
='2.6', coerce_timestamps
='ms',
148 allow_truncated_timestamps
=True)
149 table_ms
= _read_table(filename
)
150 df_ms
= table_ms
.to_pandas()
152 arrays_expected
= {'datetime64': [dt_ms
, dt_ms
]}
153 df_expected
= pd
.DataFrame(arrays_expected
)
154 tm
.assert_frame_equal(df_expected
, df_ms
)
158 def test_date_time_types(tempdir
):
160 data1
= np
.array([17259, 17260, 17261], dtype
='int32')
161 a1
= pa
.array(data1
, type=t1
)
164 data2
= data1
.astype('int64') * 86400000
165 a2
= pa
.array(data2
, type=t2
)
167 t3
= pa
.timestamp('us')
168 start
= pd
.Timestamp('2001-01-01').value
/ 1000
169 data3
= np
.array([start
, start
+ 1, start
+ 2], dtype
='int64')
170 a3
= pa
.array(data3
, type=t3
)
173 data4
= np
.arange(3, dtype
='i4')
174 a4
= pa
.array(data4
, type=t4
)
177 a5
= pa
.array(data4
.astype('int64'), type=t5
)
180 a6
= pa
.array(data4
, type=t6
)
182 ex_t6
= pa
.time32('ms')
183 ex_a6
= pa
.array(data4
* 1000, type=ex_t6
)
185 t7
= pa
.timestamp('ns')
186 start
= pd
.Timestamp('2001-01-01').value
187 data7
= np
.array([start
, start
+ 1000, start
+ 2000],
189 a7
= pa
.array(data7
, type=t7
)
191 table
= pa
.Table
.from_arrays([a1
, a2
, a3
, a4
, a5
, a6
, a7
],
192 ['date32', 'date64', 'timestamp[us]',
193 'time32[s]', 'time64[us]',
198 # time32[s] to time32[ms]
199 expected
= pa
.Table
.from_arrays([a1
, a1
, a3
, a4
, a5
, ex_a6
, a7
],
200 ['date32', 'date64', 'timestamp[us]',
201 'time32[s]', 'time64[us]',
205 _check_roundtrip(table
, expected
=expected
, version
='2.6')
207 t0
= pa
.timestamp('ms')
208 data0
= np
.arange(4, dtype
='int64')
209 a0
= pa
.array(data0
, type=t0
)
211 t1
= pa
.timestamp('us')
212 data1
= np
.arange(4, dtype
='int64')
213 a1
= pa
.array(data1
, type=t1
)
215 t2
= pa
.timestamp('ns')
216 data2
= np
.arange(4, dtype
='int64')
217 a2
= pa
.array(data2
, type=t2
)
219 table
= pa
.Table
.from_arrays([a0
, a1
, a2
],
220 ['ts[ms]', 'ts[us]', 'ts[ns]'])
221 expected
= pa
.Table
.from_arrays([a0
, a1
, a2
],
222 ['ts[ms]', 'ts[us]', 'ts[ns]'])
224 # int64 for all timestamps supported by default
225 filename
= tempdir
/ 'int64_timestamps.parquet'
226 _write_table(table
, filename
, version
='2.6')
227 parquet_schema
= pq
.ParquetFile(filename
).schema
229 assert parquet_schema
.column(i
).physical_type
== 'INT64'
230 read_table
= _read_table(filename
)
231 assert read_table
.equals(expected
)
233 t0_ns
= pa
.timestamp('ns')
234 data0_ns
= np
.array(data0
* 1000000, dtype
='int64')
235 a0_ns
= pa
.array(data0_ns
, type=t0_ns
)
237 t1_ns
= pa
.timestamp('ns')
238 data1_ns
= np
.array(data1
* 1000, dtype
='int64')
239 a1_ns
= pa
.array(data1_ns
, type=t1_ns
)
241 expected
= pa
.Table
.from_arrays([a0_ns
, a1_ns
, a2
],
242 ['ts[ms]', 'ts[us]', 'ts[ns]'])
244 # int96 nanosecond timestamps produced upon request
245 filename
= tempdir
/ 'explicit_int96_timestamps.parquet'
246 _write_table(table
, filename
, version
='2.6',
247 use_deprecated_int96_timestamps
=True)
248 parquet_schema
= pq
.ParquetFile(filename
).schema
250 assert parquet_schema
.column(i
).physical_type
== 'INT96'
251 read_table
= _read_table(filename
)
252 assert read_table
.equals(expected
)
254 # int96 nanosecond timestamps implied by flavor 'spark'
255 filename
= tempdir
/ 'spark_int96_timestamps.parquet'
256 _write_table(table
, filename
, version
='2.6',
258 parquet_schema
= pq
.ParquetFile(filename
).schema
260 assert parquet_schema
.column(i
).physical_type
== 'INT96'
261 read_table
= _read_table(filename
)
262 assert read_table
.equals(expected
)
266 @pytest.mark
.parametrize('unit', ['s', 'ms', 'us', 'ns'])
267 def test_coerce_int96_timestamp_unit(unit
):
268 i_s
= pd
.Timestamp('2010-01-01').value
/ 1000000000 # := 1262304000
270 d_s
= np
.arange(i_s
, i_s
+ 10, 1, dtype
='int64')
275 a_s
= pa
.array(d_s
, type=pa
.timestamp('s'))
276 a_ms
= pa
.array(d_ms
, type=pa
.timestamp('ms'))
277 a_us
= pa
.array(d_us
, type=pa
.timestamp('us'))
278 a_ns
= pa
.array(d_ns
, type=pa
.timestamp('ns'))
280 arrays
= {"s": a_s
, "ms": a_ms
, "us": a_us
, "ns": a_ns
}
281 names
= ['ts_s', 'ts_ms', 'ts_us', 'ts_ns']
282 table
= pa
.Table
.from_arrays([a_s
, a_ms
, a_us
, a_ns
], names
)
284 # For either Parquet version, coercing to nanoseconds is allowed
285 # if Int96 storage is used
286 expected
= pa
.Table
.from_arrays([arrays
.get(unit
)]*4, names
)
287 read_table_kwargs
= {"coerce_int96_timestamp_unit": unit
}
288 _check_roundtrip(table
, expected
,
289 read_table_kwargs
=read_table_kwargs
,
290 use_deprecated_int96_timestamps
=True)
291 _check_roundtrip(table
, expected
, version
='2.6',
292 read_table_kwargs
=read_table_kwargs
,
293 use_deprecated_int96_timestamps
=True)
297 @pytest.mark
.parametrize('pq_reader_method', ['ParquetFile', 'read_table'])
298 def test_coerce_int96_timestamp_overflow(pq_reader_method
, tempdir
):
300 def get_table(pq_reader_method
, filename
, **kwargs
):
301 if pq_reader_method
== "ParquetFile":
302 return pq
.ParquetFile(filename
, **kwargs
).read()
303 elif pq_reader_method
== "read_table":
304 return pq
.read_table(filename
, **kwargs
)
306 # Recreating the initial JIRA issue referrenced in ARROW-12096
308 datetime
.datetime(1000, 1, 1),
309 datetime
.datetime(2000, 1, 1),
310 datetime
.datetime(3000, 1, 1)
312 df
= pd
.DataFrame({"a": oob_dts
})
315 filename
= tempdir
/ "test_round_trip_overflow.parquet"
316 pq
.write_table(table
, filename
, use_deprecated_int96_timestamps
=True,
319 # with the default resolution of ns, we get wrong values for INT96
320 # that are out of bounds for nanosecond range
321 tab_error
= get_table(pq_reader_method
, filename
)
322 assert tab_error
["a"].to_pylist() != oob_dts
324 # avoid this overflow by specifying the resolution to use for INT96 values
325 tab_correct
= get_table(
326 pq_reader_method
, filename
, coerce_int96_timestamp_unit
="s"
328 df_correct
= tab_correct
.to_pandas(timestamp_as_object
=True)
329 tm
.assert_frame_equal(df
, df_correct
)
332 def test_timestamp_restore_timezone():
333 # ARROW-5888, restore timezone from serialized metadata
334 ty
= pa
.timestamp('ms', tz
='America/New_York')
335 arr
= pa
.array([1, 2, 3], type=ty
)
336 t
= pa
.table([arr
], names
=['f0'])
340 def test_timestamp_restore_timezone_nanosecond():
341 # ARROW-9634, also restore timezone for nanosecond data that get stored
342 # as microseconds in the parquet file
343 ty
= pa
.timestamp('ns', tz
='America/New_York')
344 arr
= pa
.array([1000, 2000, 3000], type=ty
)
345 table
= pa
.table([arr
], names
=['f0'])
346 ty_us
= pa
.timestamp('us', tz
='America/New_York')
347 expected
= pa
.table([arr
.cast(ty_us
)], names
=['f0'])
348 _check_roundtrip(table
, expected
=expected
)
352 def test_list_of_datetime_time_roundtrip():
354 times
= pd
.to_datetime(['09:00', '09:30', '10:00', '10:30', '11:00',
356 df
= pd
.DataFrame({'time': [times
.time
]})
357 _roundtrip_pandas_dataframe(df
, write_kwargs
={})
361 def test_parquet_version_timestamp_differences():
362 i_s
= pd
.Timestamp('2010-01-01').value
/ 1000000000 # := 1262304000
364 d_s
= np
.arange(i_s
, i_s
+ 10, 1, dtype
='int64')
369 a_s
= pa
.array(d_s
, type=pa
.timestamp('s'))
370 a_ms
= pa
.array(d_ms
, type=pa
.timestamp('ms'))
371 a_us
= pa
.array(d_us
, type=pa
.timestamp('us'))
372 a_ns
= pa
.array(d_ns
, type=pa
.timestamp('ns'))
374 names
= ['ts:s', 'ts:ms', 'ts:us', 'ts:ns']
375 table
= pa
.Table
.from_arrays([a_s
, a_ms
, a_us
, a_ns
], names
)
377 # Using Parquet version 1.0, seconds should be coerced to milliseconds
378 # and nanoseconds should be coerced to microseconds by default
379 expected
= pa
.Table
.from_arrays([a_ms
, a_ms
, a_us
, a_us
], names
)
380 _check_roundtrip(table
, expected
)
382 # Using Parquet version 2.0, seconds should be coerced to milliseconds
383 # and nanoseconds should be retained by default
384 expected
= pa
.Table
.from_arrays([a_ms
, a_ms
, a_us
, a_ns
], names
)
385 _check_roundtrip(table
, expected
, version
='2.6')
387 # Using Parquet version 1.0, coercing to milliseconds or microseconds
389 expected
= pa
.Table
.from_arrays([a_ms
, a_ms
, a_ms
, a_ms
], names
)
390 _check_roundtrip(table
, expected
, coerce_timestamps
='ms')
392 # Using Parquet version 2.0, coercing to milliseconds or microseconds
394 expected
= pa
.Table
.from_arrays([a_us
, a_us
, a_us
, a_us
], names
)
395 _check_roundtrip(table
, expected
, version
='2.6', coerce_timestamps
='us')
397 # TODO: after pyarrow allows coerce_timestamps='ns', tests like the
398 # following should pass ...
400 # Using Parquet version 1.0, coercing to nanoseconds is not allowed
402 # with pytest.raises(NotImplementedError):
403 # _roundtrip_table(table, coerce_timestamps='ns')
405 # Using Parquet version 2.0, coercing to nanoseconds is allowed
406 # expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names)
407 # _check_roundtrip(table, expected, version='2.6', coerce_timestamps='ns')
409 # For either Parquet version, coercing to nanoseconds is allowed
410 # if Int96 storage is used
411 expected
= pa
.Table
.from_arrays([a_ns
, a_ns
, a_ns
, a_ns
], names
)
412 _check_roundtrip(table
, expected
,
413 use_deprecated_int96_timestamps
=True)
414 _check_roundtrip(table
, expected
, version
='2.6',
415 use_deprecated_int96_timestamps
=True)
419 def test_noncoerced_nanoseconds_written_without_exception(tempdir
):
420 # ARROW-1957: the Parquet version 2.0 writer preserves Arrow
421 # nanosecond timestamps by default
423 df
= pd
.DataFrame({'x': range(n
)},
424 index
=pd
.date_range('2017-01-01', freq
='1n', periods
=n
))
425 tb
= pa
.Table
.from_pandas(df
)
427 filename
= tempdir
/ 'written.parquet'
429 pq
.write_table(tb
, filename
, version
='2.6')
432 assert filename
.exists()
434 recovered_table
= pq
.read_table(filename
)
435 assert tb
.equals(recovered_table
)
437 # Loss of data through coercion (without explicit override) still an error
438 filename
= tempdir
/ 'not_written.parquet'
439 with pytest
.raises(ValueError):
440 pq
.write_table(tb
, filename
, coerce_timestamps
='ms', version
='2.6')