]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/python/pyarrow/tests/test_json.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / tests / test_json.py
CommitLineData
1d09f67e
TL
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from collections import OrderedDict
19import io
20import itertools
21import json
22import string
23import unittest
24
25import numpy as np
26import pytest
27
28import pyarrow as pa
29from pyarrow.json import read_json, ReadOptions, ParseOptions
30
31
32def generate_col_names():
33 # 'a', 'b'... 'z', then 'aa', 'ab'...
34 letters = string.ascii_lowercase
35 yield from letters
36 for first in letters:
37 for second in letters:
38 yield first + second
39
40
41def make_random_json(num_cols=2, num_rows=10, linesep='\r\n'):
42 arr = np.random.RandomState(42).randint(0, 1000, size=(num_cols, num_rows))
43 col_names = list(itertools.islice(generate_col_names(), num_cols))
44 lines = []
45 for row in arr.T:
46 json_obj = OrderedDict([(k, int(v)) for (k, v) in zip(col_names, row)])
47 lines.append(json.dumps(json_obj))
48 data = linesep.join(lines).encode()
49 columns = [pa.array(col, type=pa.int64()) for col in arr]
50 expected = pa.Table.from_arrays(columns, col_names)
51 return data, expected
52
53
54def test_read_options():
55 cls = ReadOptions
56 opts = cls()
57
58 assert opts.block_size > 0
59 opts.block_size = 12345
60 assert opts.block_size == 12345
61
62 assert opts.use_threads is True
63 opts.use_threads = False
64 assert opts.use_threads is False
65
66 opts = cls(block_size=1234, use_threads=False)
67 assert opts.block_size == 1234
68 assert opts.use_threads is False
69
70
71def test_parse_options():
72 cls = ParseOptions
73 opts = cls()
74 assert opts.newlines_in_values is False
75 assert opts.explicit_schema is None
76
77 opts.newlines_in_values = True
78 assert opts.newlines_in_values is True
79
80 schema = pa.schema([pa.field('foo', pa.int32())])
81 opts.explicit_schema = schema
82 assert opts.explicit_schema == schema
83
84 assert opts.unexpected_field_behavior == "infer"
85 for value in ["ignore", "error", "infer"]:
86 opts.unexpected_field_behavior = value
87 assert opts.unexpected_field_behavior == value
88
89 with pytest.raises(ValueError):
90 opts.unexpected_field_behavior = "invalid-value"
91
92
93class BaseTestJSONRead:
94
95 def read_bytes(self, b, **kwargs):
96 return self.read_json(pa.py_buffer(b), **kwargs)
97
98 def check_names(self, table, names):
99 assert table.num_columns == len(names)
100 assert [c.name for c in table.columns] == names
101
102 def test_file_object(self):
103 data = b'{"a": 1, "b": 2}\n'
104 expected_data = {'a': [1], 'b': [2]}
105 bio = io.BytesIO(data)
106 table = self.read_json(bio)
107 assert table.to_pydict() == expected_data
108 # Text files not allowed
109 sio = io.StringIO(data.decode())
110 with pytest.raises(TypeError):
111 self.read_json(sio)
112
113 def test_block_sizes(self):
114 rows = b'{"a": 1}\n{"a": 2}\n{"a": 3}'
115 read_options = ReadOptions()
116 parse_options = ParseOptions()
117
118 for data in [rows, rows + b'\n']:
119 for newlines_in_values in [False, True]:
120 parse_options.newlines_in_values = newlines_in_values
121 read_options.block_size = 4
122 with pytest.raises(ValueError,
123 match="try to increase block size"):
124 self.read_bytes(data, read_options=read_options,
125 parse_options=parse_options)
126
127 # Validate reader behavior with various block sizes.
128 # There used to be bugs in this area.
129 for block_size in range(9, 20):
130 read_options.block_size = block_size
131 table = self.read_bytes(data, read_options=read_options,
132 parse_options=parse_options)
133 assert table.to_pydict() == {'a': [1, 2, 3]}
134
135 def test_no_newline_at_end(self):
136 rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}'
137 table = self.read_bytes(rows)
138 assert table.to_pydict() == {
139 'a': [1, 4],
140 'b': [2, 5],
141 'c': [3, 6],
142 }
143
144 def test_simple_ints(self):
145 # Infer integer columns
146 rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}\n'
147 table = self.read_bytes(rows)
148 schema = pa.schema([('a', pa.int64()),
149 ('b', pa.int64()),
150 ('c', pa.int64())])
151 assert table.schema == schema
152 assert table.to_pydict() == {
153 'a': [1, 4],
154 'b': [2, 5],
155 'c': [3, 6],
156 }
157
158 def test_simple_varied(self):
159 # Infer various kinds of data
160 rows = (b'{"a": 1,"b": 2, "c": "3", "d": false}\n'
161 b'{"a": 4.0, "b": -5, "c": "foo", "d": true}\n')
162 table = self.read_bytes(rows)
163 schema = pa.schema([('a', pa.float64()),
164 ('b', pa.int64()),
165 ('c', pa.string()),
166 ('d', pa.bool_())])
167 assert table.schema == schema
168 assert table.to_pydict() == {
169 'a': [1.0, 4.0],
170 'b': [2, -5],
171 'c': ["3", "foo"],
172 'd': [False, True],
173 }
174
175 def test_simple_nulls(self):
176 # Infer various kinds of data, with nulls
177 rows = (b'{"a": 1, "b": 2, "c": null, "d": null, "e": null}\n'
178 b'{"a": null, "b": -5, "c": "foo", "d": null, "e": true}\n'
179 b'{"a": 4.5, "b": null, "c": "nan", "d": null,"e": false}\n')
180 table = self.read_bytes(rows)
181 schema = pa.schema([('a', pa.float64()),
182 ('b', pa.int64()),
183 ('c', pa.string()),
184 ('d', pa.null()),
185 ('e', pa.bool_())])
186 assert table.schema == schema
187 assert table.to_pydict() == {
188 'a': [1.0, None, 4.5],
189 'b': [2, -5, None],
190 'c': [None, "foo", "nan"],
191 'd': [None, None, None],
192 'e': [None, True, False],
193 }
194
195 def test_empty_lists(self):
196 # ARROW-10955: Infer list(null)
197 rows = b'{"a": []}'
198 table = self.read_bytes(rows)
199 schema = pa.schema([('a', pa.list_(pa.null()))])
200 assert table.schema == schema
201 assert table.to_pydict() == {'a': [[]]}
202
203 def test_empty_rows(self):
204 rows = b'{}\n{}\n'
205 table = self.read_bytes(rows)
206 schema = pa.schema([])
207 assert table.schema == schema
208 assert table.num_columns == 0
209 assert table.num_rows == 2
210
211 def test_reconcile_accross_blocks(self):
212 # ARROW-12065: reconciling inferred types accross blocks
213 first_row = b'{ }\n'
214 read_options = ReadOptions(block_size=len(first_row))
215 for next_rows, expected_pylist in [
216 (b'{"a": 0}', [None, 0]),
217 (b'{"a": []}', [None, []]),
218 (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]),
219 (b'{"a": {}}', [None, {}]),
220 (b'{"a": {}}\n{"a": {"b": {"c": 1}}}',
221 [None, {"b": None}, {"b": {"c": 1}}]),
222 ]:
223 table = self.read_bytes(first_row + next_rows,
224 read_options=read_options)
225 expected = {"a": expected_pylist}
226 assert table.to_pydict() == expected
227 # Check that the issue was exercised
228 assert table.column("a").num_chunks > 1
229
230 def test_explicit_schema_with_unexpected_behaviour(self):
231 # infer by default
232 rows = (b'{"foo": "bar", "num": 0}\n'
233 b'{"foo": "baz", "num": 1}\n')
234 schema = pa.schema([
235 ('foo', pa.binary())
236 ])
237
238 opts = ParseOptions(explicit_schema=schema)
239 table = self.read_bytes(rows, parse_options=opts)
240 assert table.schema == pa.schema([
241 ('foo', pa.binary()),
242 ('num', pa.int64())
243 ])
244 assert table.to_pydict() == {
245 'foo': [b'bar', b'baz'],
246 'num': [0, 1],
247 }
248
249 # ignore the unexpected fields
250 opts = ParseOptions(explicit_schema=schema,
251 unexpected_field_behavior="ignore")
252 table = self.read_bytes(rows, parse_options=opts)
253 assert table.schema == pa.schema([
254 ('foo', pa.binary()),
255 ])
256 assert table.to_pydict() == {
257 'foo': [b'bar', b'baz'],
258 }
259
260 # raise error
261 opts = ParseOptions(explicit_schema=schema,
262 unexpected_field_behavior="error")
263 with pytest.raises(pa.ArrowInvalid,
264 match="JSON parse error: unexpected field"):
265 self.read_bytes(rows, parse_options=opts)
266
267 def test_small_random_json(self):
268 data, expected = make_random_json(num_cols=2, num_rows=10)
269 table = self.read_bytes(data)
270 assert table.schema == expected.schema
271 assert table.equals(expected)
272 assert table.to_pydict() == expected.to_pydict()
273
274 def test_stress_block_sizes(self):
275 # Test a number of small block sizes to stress block stitching
276 data_base, expected = make_random_json(num_cols=2, num_rows=100)
277 read_options = ReadOptions()
278 parse_options = ParseOptions()
279
280 for data in [data_base, data_base.rstrip(b'\r\n')]:
281 for newlines_in_values in [False, True]:
282 parse_options.newlines_in_values = newlines_in_values
283 for block_size in [22, 23, 37]:
284 read_options.block_size = block_size
285 table = self.read_bytes(data, read_options=read_options,
286 parse_options=parse_options)
287 assert table.schema == expected.schema
288 if not table.equals(expected):
289 # Better error output
290 assert table.to_pydict() == expected.to_pydict()
291
292
293class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase):
294
295 def read_json(self, *args, **kwargs):
296 read_options = kwargs.setdefault('read_options', ReadOptions())
297 read_options.use_threads = False
298 table = read_json(*args, **kwargs)
299 table.validate(full=True)
300 return table
301
302
303class TestParallelJSONRead(BaseTestJSONRead, unittest.TestCase):
304
305 def read_json(self, *args, **kwargs):
306 read_options = kwargs.setdefault('read_options', ReadOptions())
307 read_options.use_threads = True
308 table = read_json(*args, **kwargs)
309 table.validate(full=True)
310 return table