]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/pyarrow/tests/parquet/test_datetime.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / tests / parquet / test_datetime.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 import datetime
19 import io
20
21 import numpy as np
22 import pytest
23
24 import pyarrow as pa
25 from pyarrow.tests.parquet.common import (
26 _check_roundtrip, parametrize_legacy_dataset)
27
28 try:
29 import pyarrow.parquet as pq
30 from pyarrow.tests.parquet.common import _read_table, _write_table
31 except ImportError:
32 pq = None
33
34
35 try:
36 import pandas as pd
37 import pandas.testing as tm
38
39 from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe
40 except ImportError:
41 pd = tm = None
42
43
44 pytestmark = pytest.mark.parquet
45
46
47 @pytest.mark.pandas
48 @parametrize_legacy_dataset
49 def test_pandas_parquet_datetime_tz(use_legacy_dataset):
50 s = pd.Series([datetime.datetime(2017, 9, 6)])
51 s = s.dt.tz_localize('utc')
52
53 s.index = s
54
55 # Both a column and an index to hit both use cases
56 df = pd.DataFrame({'tz_aware': s,
57 'tz_eastern': s.dt.tz_convert('US/Eastern')},
58 index=s)
59
60 f = io.BytesIO()
61
62 arrow_table = pa.Table.from_pandas(df)
63
64 _write_table(arrow_table, f, coerce_timestamps='ms')
65 f.seek(0)
66
67 table_read = pq.read_pandas(f, use_legacy_dataset=use_legacy_dataset)
68
69 df_read = table_read.to_pandas()
70 tm.assert_frame_equal(df, df_read)
71
72
73 @pytest.mark.pandas
74 @parametrize_legacy_dataset
75 def test_datetime_timezone_tzinfo(use_legacy_dataset):
76 value = datetime.datetime(2018, 1, 1, 1, 23, 45,
77 tzinfo=datetime.timezone.utc)
78 df = pd.DataFrame({'foo': [value]})
79
80 _roundtrip_pandas_dataframe(
81 df, write_kwargs={}, use_legacy_dataset=use_legacy_dataset)
82
83
84 @pytest.mark.pandas
85 def test_coerce_timestamps(tempdir):
86 from collections import OrderedDict
87
88 # ARROW-622
89 arrays = OrderedDict()
90 fields = [pa.field('datetime64',
91 pa.list_(pa.timestamp('ms')))]
92 arrays['datetime64'] = [
93 np.array(['2007-07-13T01:23:34.123456789',
94 None,
95 '2010-08-13T05:46:57.437699912'],
96 dtype='datetime64[ms]'),
97 None,
98 None,
99 np.array(['2007-07-13T02',
100 None,
101 '2010-08-13T05:46:57.437699912'],
102 dtype='datetime64[ms]'),
103 ]
104
105 df = pd.DataFrame(arrays)
106 schema = pa.schema(fields)
107
108 filename = tempdir / 'pandas_roundtrip.parquet'
109 arrow_table = pa.Table.from_pandas(df, schema=schema)
110
111 _write_table(arrow_table, filename, version='2.6', coerce_timestamps='us')
112 table_read = _read_table(filename)
113 df_read = table_read.to_pandas()
114
115 df_expected = df.copy()
116 for i, x in enumerate(df_expected['datetime64']):
117 if isinstance(x, np.ndarray):
118 df_expected['datetime64'][i] = x.astype('M8[us]')
119
120 tm.assert_frame_equal(df_expected, df_read)
121
122 with pytest.raises(ValueError):
123 _write_table(arrow_table, filename, version='2.6',
124 coerce_timestamps='unknown')
125
126
127 @pytest.mark.pandas
128 def test_coerce_timestamps_truncated(tempdir):
129 """
130 ARROW-2555: Test that we can truncate timestamps when coercing if
131 explicitly allowed.
132 """
133 dt_us = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
134 second=1, microsecond=1)
135 dt_ms = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
136 second=1)
137
138 fields_us = [pa.field('datetime64', pa.timestamp('us'))]
139 arrays_us = {'datetime64': [dt_us, dt_ms]}
140
141 df_us = pd.DataFrame(arrays_us)
142 schema_us = pa.schema(fields_us)
143
144 filename = tempdir / 'pandas_truncated.parquet'
145 table_us = pa.Table.from_pandas(df_us, schema=schema_us)
146
147 _write_table(table_us, filename, version='2.6', coerce_timestamps='ms',
148 allow_truncated_timestamps=True)
149 table_ms = _read_table(filename)
150 df_ms = table_ms.to_pandas()
151
152 arrays_expected = {'datetime64': [dt_ms, dt_ms]}
153 df_expected = pd.DataFrame(arrays_expected)
154 tm.assert_frame_equal(df_expected, df_ms)
155
156
157 @pytest.mark.pandas
158 def test_date_time_types(tempdir):
159 t1 = pa.date32()
160 data1 = np.array([17259, 17260, 17261], dtype='int32')
161 a1 = pa.array(data1, type=t1)
162
163 t2 = pa.date64()
164 data2 = data1.astype('int64') * 86400000
165 a2 = pa.array(data2, type=t2)
166
167 t3 = pa.timestamp('us')
168 start = pd.Timestamp('2001-01-01').value / 1000
169 data3 = np.array([start, start + 1, start + 2], dtype='int64')
170 a3 = pa.array(data3, type=t3)
171
172 t4 = pa.time32('ms')
173 data4 = np.arange(3, dtype='i4')
174 a4 = pa.array(data4, type=t4)
175
176 t5 = pa.time64('us')
177 a5 = pa.array(data4.astype('int64'), type=t5)
178
179 t6 = pa.time32('s')
180 a6 = pa.array(data4, type=t6)
181
182 ex_t6 = pa.time32('ms')
183 ex_a6 = pa.array(data4 * 1000, type=ex_t6)
184
185 t7 = pa.timestamp('ns')
186 start = pd.Timestamp('2001-01-01').value
187 data7 = np.array([start, start + 1000, start + 2000],
188 dtype='int64')
189 a7 = pa.array(data7, type=t7)
190
191 table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6, a7],
192 ['date32', 'date64', 'timestamp[us]',
193 'time32[s]', 'time64[us]',
194 'time32_from64[s]',
195 'timestamp[ns]'])
196
197 # date64 as date32
198 # time32[s] to time32[ms]
199 expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7],
200 ['date32', 'date64', 'timestamp[us]',
201 'time32[s]', 'time64[us]',
202 'time32_from64[s]',
203 'timestamp[ns]'])
204
205 _check_roundtrip(table, expected=expected, version='2.6')
206
207 t0 = pa.timestamp('ms')
208 data0 = np.arange(4, dtype='int64')
209 a0 = pa.array(data0, type=t0)
210
211 t1 = pa.timestamp('us')
212 data1 = np.arange(4, dtype='int64')
213 a1 = pa.array(data1, type=t1)
214
215 t2 = pa.timestamp('ns')
216 data2 = np.arange(4, dtype='int64')
217 a2 = pa.array(data2, type=t2)
218
219 table = pa.Table.from_arrays([a0, a1, a2],
220 ['ts[ms]', 'ts[us]', 'ts[ns]'])
221 expected = pa.Table.from_arrays([a0, a1, a2],
222 ['ts[ms]', 'ts[us]', 'ts[ns]'])
223
224 # int64 for all timestamps supported by default
225 filename = tempdir / 'int64_timestamps.parquet'
226 _write_table(table, filename, version='2.6')
227 parquet_schema = pq.ParquetFile(filename).schema
228 for i in range(3):
229 assert parquet_schema.column(i).physical_type == 'INT64'
230 read_table = _read_table(filename)
231 assert read_table.equals(expected)
232
233 t0_ns = pa.timestamp('ns')
234 data0_ns = np.array(data0 * 1000000, dtype='int64')
235 a0_ns = pa.array(data0_ns, type=t0_ns)
236
237 t1_ns = pa.timestamp('ns')
238 data1_ns = np.array(data1 * 1000, dtype='int64')
239 a1_ns = pa.array(data1_ns, type=t1_ns)
240
241 expected = pa.Table.from_arrays([a0_ns, a1_ns, a2],
242 ['ts[ms]', 'ts[us]', 'ts[ns]'])
243
244 # int96 nanosecond timestamps produced upon request
245 filename = tempdir / 'explicit_int96_timestamps.parquet'
246 _write_table(table, filename, version='2.6',
247 use_deprecated_int96_timestamps=True)
248 parquet_schema = pq.ParquetFile(filename).schema
249 for i in range(3):
250 assert parquet_schema.column(i).physical_type == 'INT96'
251 read_table = _read_table(filename)
252 assert read_table.equals(expected)
253
254 # int96 nanosecond timestamps implied by flavor 'spark'
255 filename = tempdir / 'spark_int96_timestamps.parquet'
256 _write_table(table, filename, version='2.6',
257 flavor='spark')
258 parquet_schema = pq.ParquetFile(filename).schema
259 for i in range(3):
260 assert parquet_schema.column(i).physical_type == 'INT96'
261 read_table = _read_table(filename)
262 assert read_table.equals(expected)
263
264
265 @pytest.mark.pandas
266 @pytest.mark.parametrize('unit', ['s', 'ms', 'us', 'ns'])
267 def test_coerce_int96_timestamp_unit(unit):
268 i_s = pd.Timestamp('2010-01-01').value / 1000000000 # := 1262304000
269
270 d_s = np.arange(i_s, i_s + 10, 1, dtype='int64')
271 d_ms = d_s * 1000
272 d_us = d_ms * 1000
273 d_ns = d_us * 1000
274
275 a_s = pa.array(d_s, type=pa.timestamp('s'))
276 a_ms = pa.array(d_ms, type=pa.timestamp('ms'))
277 a_us = pa.array(d_us, type=pa.timestamp('us'))
278 a_ns = pa.array(d_ns, type=pa.timestamp('ns'))
279
280 arrays = {"s": a_s, "ms": a_ms, "us": a_us, "ns": a_ns}
281 names = ['ts_s', 'ts_ms', 'ts_us', 'ts_ns']
282 table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names)
283
284 # For either Parquet version, coercing to nanoseconds is allowed
285 # if Int96 storage is used
286 expected = pa.Table.from_arrays([arrays.get(unit)]*4, names)
287 read_table_kwargs = {"coerce_int96_timestamp_unit": unit}
288 _check_roundtrip(table, expected,
289 read_table_kwargs=read_table_kwargs,
290 use_deprecated_int96_timestamps=True)
291 _check_roundtrip(table, expected, version='2.6',
292 read_table_kwargs=read_table_kwargs,
293 use_deprecated_int96_timestamps=True)
294
295
296 @pytest.mark.pandas
297 @pytest.mark.parametrize('pq_reader_method', ['ParquetFile', 'read_table'])
298 def test_coerce_int96_timestamp_overflow(pq_reader_method, tempdir):
299
300 def get_table(pq_reader_method, filename, **kwargs):
301 if pq_reader_method == "ParquetFile":
302 return pq.ParquetFile(filename, **kwargs).read()
303 elif pq_reader_method == "read_table":
304 return pq.read_table(filename, **kwargs)
305
306 # Recreating the initial JIRA issue referrenced in ARROW-12096
307 oob_dts = [
308 datetime.datetime(1000, 1, 1),
309 datetime.datetime(2000, 1, 1),
310 datetime.datetime(3000, 1, 1)
311 ]
312 df = pd.DataFrame({"a": oob_dts})
313 table = pa.table(df)
314
315 filename = tempdir / "test_round_trip_overflow.parquet"
316 pq.write_table(table, filename, use_deprecated_int96_timestamps=True,
317 version="1.0")
318
319 # with the default resolution of ns, we get wrong values for INT96
320 # that are out of bounds for nanosecond range
321 tab_error = get_table(pq_reader_method, filename)
322 assert tab_error["a"].to_pylist() != oob_dts
323
324 # avoid this overflow by specifying the resolution to use for INT96 values
325 tab_correct = get_table(
326 pq_reader_method, filename, coerce_int96_timestamp_unit="s"
327 )
328 df_correct = tab_correct.to_pandas(timestamp_as_object=True)
329 tm.assert_frame_equal(df, df_correct)
330
331
332 def test_timestamp_restore_timezone():
333 # ARROW-5888, restore timezone from serialized metadata
334 ty = pa.timestamp('ms', tz='America/New_York')
335 arr = pa.array([1, 2, 3], type=ty)
336 t = pa.table([arr], names=['f0'])
337 _check_roundtrip(t)
338
339
340 def test_timestamp_restore_timezone_nanosecond():
341 # ARROW-9634, also restore timezone for nanosecond data that get stored
342 # as microseconds in the parquet file
343 ty = pa.timestamp('ns', tz='America/New_York')
344 arr = pa.array([1000, 2000, 3000], type=ty)
345 table = pa.table([arr], names=['f0'])
346 ty_us = pa.timestamp('us', tz='America/New_York')
347 expected = pa.table([arr.cast(ty_us)], names=['f0'])
348 _check_roundtrip(table, expected=expected)
349
350
351 @pytest.mark.pandas
352 def test_list_of_datetime_time_roundtrip():
353 # ARROW-4135
354 times = pd.to_datetime(['09:00', '09:30', '10:00', '10:30', '11:00',
355 '11:30', '12:00'])
356 df = pd.DataFrame({'time': [times.time]})
357 _roundtrip_pandas_dataframe(df, write_kwargs={})
358
359
360 @pytest.mark.pandas
361 def test_parquet_version_timestamp_differences():
362 i_s = pd.Timestamp('2010-01-01').value / 1000000000 # := 1262304000
363
364 d_s = np.arange(i_s, i_s + 10, 1, dtype='int64')
365 d_ms = d_s * 1000
366 d_us = d_ms * 1000
367 d_ns = d_us * 1000
368
369 a_s = pa.array(d_s, type=pa.timestamp('s'))
370 a_ms = pa.array(d_ms, type=pa.timestamp('ms'))
371 a_us = pa.array(d_us, type=pa.timestamp('us'))
372 a_ns = pa.array(d_ns, type=pa.timestamp('ns'))
373
374 names = ['ts:s', 'ts:ms', 'ts:us', 'ts:ns']
375 table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names)
376
377 # Using Parquet version 1.0, seconds should be coerced to milliseconds
378 # and nanoseconds should be coerced to microseconds by default
379 expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_us], names)
380 _check_roundtrip(table, expected)
381
382 # Using Parquet version 2.0, seconds should be coerced to milliseconds
383 # and nanoseconds should be retained by default
384 expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_ns], names)
385 _check_roundtrip(table, expected, version='2.6')
386
387 # Using Parquet version 1.0, coercing to milliseconds or microseconds
388 # is allowed
389 expected = pa.Table.from_arrays([a_ms, a_ms, a_ms, a_ms], names)
390 _check_roundtrip(table, expected, coerce_timestamps='ms')
391
392 # Using Parquet version 2.0, coercing to milliseconds or microseconds
393 # is allowed
394 expected = pa.Table.from_arrays([a_us, a_us, a_us, a_us], names)
395 _check_roundtrip(table, expected, version='2.6', coerce_timestamps='us')
396
397 # TODO: after pyarrow allows coerce_timestamps='ns', tests like the
398 # following should pass ...
399
400 # Using Parquet version 1.0, coercing to nanoseconds is not allowed
401 # expected = None
402 # with pytest.raises(NotImplementedError):
403 # _roundtrip_table(table, coerce_timestamps='ns')
404
405 # Using Parquet version 2.0, coercing to nanoseconds is allowed
406 # expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names)
407 # _check_roundtrip(table, expected, version='2.6', coerce_timestamps='ns')
408
409 # For either Parquet version, coercing to nanoseconds is allowed
410 # if Int96 storage is used
411 expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names)
412 _check_roundtrip(table, expected,
413 use_deprecated_int96_timestamps=True)
414 _check_roundtrip(table, expected, version='2.6',
415 use_deprecated_int96_timestamps=True)
416
417
418 @pytest.mark.pandas
419 def test_noncoerced_nanoseconds_written_without_exception(tempdir):
420 # ARROW-1957: the Parquet version 2.0 writer preserves Arrow
421 # nanosecond timestamps by default
422 n = 9
423 df = pd.DataFrame({'x': range(n)},
424 index=pd.date_range('2017-01-01', freq='1n', periods=n))
425 tb = pa.Table.from_pandas(df)
426
427 filename = tempdir / 'written.parquet'
428 try:
429 pq.write_table(tb, filename, version='2.6')
430 except Exception:
431 pass
432 assert filename.exists()
433
434 recovered_table = pq.read_table(filename)
435 assert tb.equals(recovered_table)
436
437 # Loss of data through coercion (without explicit override) still an error
438 filename = tempdir / 'not_written.parquet'
439 with pytest.raises(ValueError):
440 pq.write_table(tb, filename, coerce_timestamps='ms', version='2.6')