]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | # Licensed to the Apache Software Foundation (ASF) under one |
2 | # or more contributor license agreements. See the NOTICE file | |
3 | # distributed with this work for additional information | |
4 | # regarding copyright ownership. The ASF licenses this file | |
5 | # to you under the Apache License, Version 2.0 (the | |
6 | # "License"); you may not use this file except in compliance | |
7 | # with the License. You may obtain a copy of the License at | |
8 | # | |
9 | # http://www.apache.org/licenses/LICENSE-2.0 | |
10 | # | |
11 | # Unless required by applicable law or agreed to in writing, | |
12 | # software distributed under the License is distributed on an | |
13 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | # KIND, either express or implied. See the License for the | |
15 | # specific language governing permissions and limitations | |
16 | # under the License. | |
17 | ||
18 | from collections import OrderedDict | |
19 | import io | |
20 | import itertools | |
21 | import json | |
22 | import string | |
23 | import unittest | |
24 | ||
25 | import numpy as np | |
26 | import pytest | |
27 | ||
28 | import pyarrow as pa | |
29 | from pyarrow.json import read_json, ReadOptions, ParseOptions | |
30 | ||
31 | ||
32 | def generate_col_names(): | |
33 | # 'a', 'b'... 'z', then 'aa', 'ab'... | |
34 | letters = string.ascii_lowercase | |
35 | yield from letters | |
36 | for first in letters: | |
37 | for second in letters: | |
38 | yield first + second | |
39 | ||
40 | ||
41 | def make_random_json(num_cols=2, num_rows=10, linesep='\r\n'): | |
42 | arr = np.random.RandomState(42).randint(0, 1000, size=(num_cols, num_rows)) | |
43 | col_names = list(itertools.islice(generate_col_names(), num_cols)) | |
44 | lines = [] | |
45 | for row in arr.T: | |
46 | json_obj = OrderedDict([(k, int(v)) for (k, v) in zip(col_names, row)]) | |
47 | lines.append(json.dumps(json_obj)) | |
48 | data = linesep.join(lines).encode() | |
49 | columns = [pa.array(col, type=pa.int64()) for col in arr] | |
50 | expected = pa.Table.from_arrays(columns, col_names) | |
51 | return data, expected | |
52 | ||
53 | ||
54 | def test_read_options(): | |
55 | cls = ReadOptions | |
56 | opts = cls() | |
57 | ||
58 | assert opts.block_size > 0 | |
59 | opts.block_size = 12345 | |
60 | assert opts.block_size == 12345 | |
61 | ||
62 | assert opts.use_threads is True | |
63 | opts.use_threads = False | |
64 | assert opts.use_threads is False | |
65 | ||
66 | opts = cls(block_size=1234, use_threads=False) | |
67 | assert opts.block_size == 1234 | |
68 | assert opts.use_threads is False | |
69 | ||
70 | ||
71 | def test_parse_options(): | |
72 | cls = ParseOptions | |
73 | opts = cls() | |
74 | assert opts.newlines_in_values is False | |
75 | assert opts.explicit_schema is None | |
76 | ||
77 | opts.newlines_in_values = True | |
78 | assert opts.newlines_in_values is True | |
79 | ||
80 | schema = pa.schema([pa.field('foo', pa.int32())]) | |
81 | opts.explicit_schema = schema | |
82 | assert opts.explicit_schema == schema | |
83 | ||
84 | assert opts.unexpected_field_behavior == "infer" | |
85 | for value in ["ignore", "error", "infer"]: | |
86 | opts.unexpected_field_behavior = value | |
87 | assert opts.unexpected_field_behavior == value | |
88 | ||
89 | with pytest.raises(ValueError): | |
90 | opts.unexpected_field_behavior = "invalid-value" | |
91 | ||
92 | ||
93 | class BaseTestJSONRead: | |
94 | ||
95 | def read_bytes(self, b, **kwargs): | |
96 | return self.read_json(pa.py_buffer(b), **kwargs) | |
97 | ||
98 | def check_names(self, table, names): | |
99 | assert table.num_columns == len(names) | |
100 | assert [c.name for c in table.columns] == names | |
101 | ||
102 | def test_file_object(self): | |
103 | data = b'{"a": 1, "b": 2}\n' | |
104 | expected_data = {'a': [1], 'b': [2]} | |
105 | bio = io.BytesIO(data) | |
106 | table = self.read_json(bio) | |
107 | assert table.to_pydict() == expected_data | |
108 | # Text files not allowed | |
109 | sio = io.StringIO(data.decode()) | |
110 | with pytest.raises(TypeError): | |
111 | self.read_json(sio) | |
112 | ||
113 | def test_block_sizes(self): | |
114 | rows = b'{"a": 1}\n{"a": 2}\n{"a": 3}' | |
115 | read_options = ReadOptions() | |
116 | parse_options = ParseOptions() | |
117 | ||
118 | for data in [rows, rows + b'\n']: | |
119 | for newlines_in_values in [False, True]: | |
120 | parse_options.newlines_in_values = newlines_in_values | |
121 | read_options.block_size = 4 | |
122 | with pytest.raises(ValueError, | |
123 | match="try to increase block size"): | |
124 | self.read_bytes(data, read_options=read_options, | |
125 | parse_options=parse_options) | |
126 | ||
127 | # Validate reader behavior with various block sizes. | |
128 | # There used to be bugs in this area. | |
129 | for block_size in range(9, 20): | |
130 | read_options.block_size = block_size | |
131 | table = self.read_bytes(data, read_options=read_options, | |
132 | parse_options=parse_options) | |
133 | assert table.to_pydict() == {'a': [1, 2, 3]} | |
134 | ||
135 | def test_no_newline_at_end(self): | |
136 | rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}' | |
137 | table = self.read_bytes(rows) | |
138 | assert table.to_pydict() == { | |
139 | 'a': [1, 4], | |
140 | 'b': [2, 5], | |
141 | 'c': [3, 6], | |
142 | } | |
143 | ||
144 | def test_simple_ints(self): | |
145 | # Infer integer columns | |
146 | rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}\n' | |
147 | table = self.read_bytes(rows) | |
148 | schema = pa.schema([('a', pa.int64()), | |
149 | ('b', pa.int64()), | |
150 | ('c', pa.int64())]) | |
151 | assert table.schema == schema | |
152 | assert table.to_pydict() == { | |
153 | 'a': [1, 4], | |
154 | 'b': [2, 5], | |
155 | 'c': [3, 6], | |
156 | } | |
157 | ||
158 | def test_simple_varied(self): | |
159 | # Infer various kinds of data | |
160 | rows = (b'{"a": 1,"b": 2, "c": "3", "d": false}\n' | |
161 | b'{"a": 4.0, "b": -5, "c": "foo", "d": true}\n') | |
162 | table = self.read_bytes(rows) | |
163 | schema = pa.schema([('a', pa.float64()), | |
164 | ('b', pa.int64()), | |
165 | ('c', pa.string()), | |
166 | ('d', pa.bool_())]) | |
167 | assert table.schema == schema | |
168 | assert table.to_pydict() == { | |
169 | 'a': [1.0, 4.0], | |
170 | 'b': [2, -5], | |
171 | 'c': ["3", "foo"], | |
172 | 'd': [False, True], | |
173 | } | |
174 | ||
175 | def test_simple_nulls(self): | |
176 | # Infer various kinds of data, with nulls | |
177 | rows = (b'{"a": 1, "b": 2, "c": null, "d": null, "e": null}\n' | |
178 | b'{"a": null, "b": -5, "c": "foo", "d": null, "e": true}\n' | |
179 | b'{"a": 4.5, "b": null, "c": "nan", "d": null,"e": false}\n') | |
180 | table = self.read_bytes(rows) | |
181 | schema = pa.schema([('a', pa.float64()), | |
182 | ('b', pa.int64()), | |
183 | ('c', pa.string()), | |
184 | ('d', pa.null()), | |
185 | ('e', pa.bool_())]) | |
186 | assert table.schema == schema | |
187 | assert table.to_pydict() == { | |
188 | 'a': [1.0, None, 4.5], | |
189 | 'b': [2, -5, None], | |
190 | 'c': [None, "foo", "nan"], | |
191 | 'd': [None, None, None], | |
192 | 'e': [None, True, False], | |
193 | } | |
194 | ||
195 | def test_empty_lists(self): | |
196 | # ARROW-10955: Infer list(null) | |
197 | rows = b'{"a": []}' | |
198 | table = self.read_bytes(rows) | |
199 | schema = pa.schema([('a', pa.list_(pa.null()))]) | |
200 | assert table.schema == schema | |
201 | assert table.to_pydict() == {'a': [[]]} | |
202 | ||
203 | def test_empty_rows(self): | |
204 | rows = b'{}\n{}\n' | |
205 | table = self.read_bytes(rows) | |
206 | schema = pa.schema([]) | |
207 | assert table.schema == schema | |
208 | assert table.num_columns == 0 | |
209 | assert table.num_rows == 2 | |
210 | ||
211 | def test_reconcile_accross_blocks(self): | |
212 | # ARROW-12065: reconciling inferred types accross blocks | |
213 | first_row = b'{ }\n' | |
214 | read_options = ReadOptions(block_size=len(first_row)) | |
215 | for next_rows, expected_pylist in [ | |
216 | (b'{"a": 0}', [None, 0]), | |
217 | (b'{"a": []}', [None, []]), | |
218 | (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]), | |
219 | (b'{"a": {}}', [None, {}]), | |
220 | (b'{"a": {}}\n{"a": {"b": {"c": 1}}}', | |
221 | [None, {"b": None}, {"b": {"c": 1}}]), | |
222 | ]: | |
223 | table = self.read_bytes(first_row + next_rows, | |
224 | read_options=read_options) | |
225 | expected = {"a": expected_pylist} | |
226 | assert table.to_pydict() == expected | |
227 | # Check that the issue was exercised | |
228 | assert table.column("a").num_chunks > 1 | |
229 | ||
230 | def test_explicit_schema_with_unexpected_behaviour(self): | |
231 | # infer by default | |
232 | rows = (b'{"foo": "bar", "num": 0}\n' | |
233 | b'{"foo": "baz", "num": 1}\n') | |
234 | schema = pa.schema([ | |
235 | ('foo', pa.binary()) | |
236 | ]) | |
237 | ||
238 | opts = ParseOptions(explicit_schema=schema) | |
239 | table = self.read_bytes(rows, parse_options=opts) | |
240 | assert table.schema == pa.schema([ | |
241 | ('foo', pa.binary()), | |
242 | ('num', pa.int64()) | |
243 | ]) | |
244 | assert table.to_pydict() == { | |
245 | 'foo': [b'bar', b'baz'], | |
246 | 'num': [0, 1], | |
247 | } | |
248 | ||
249 | # ignore the unexpected fields | |
250 | opts = ParseOptions(explicit_schema=schema, | |
251 | unexpected_field_behavior="ignore") | |
252 | table = self.read_bytes(rows, parse_options=opts) | |
253 | assert table.schema == pa.schema([ | |
254 | ('foo', pa.binary()), | |
255 | ]) | |
256 | assert table.to_pydict() == { | |
257 | 'foo': [b'bar', b'baz'], | |
258 | } | |
259 | ||
260 | # raise error | |
261 | opts = ParseOptions(explicit_schema=schema, | |
262 | unexpected_field_behavior="error") | |
263 | with pytest.raises(pa.ArrowInvalid, | |
264 | match="JSON parse error: unexpected field"): | |
265 | self.read_bytes(rows, parse_options=opts) | |
266 | ||
267 | def test_small_random_json(self): | |
268 | data, expected = make_random_json(num_cols=2, num_rows=10) | |
269 | table = self.read_bytes(data) | |
270 | assert table.schema == expected.schema | |
271 | assert table.equals(expected) | |
272 | assert table.to_pydict() == expected.to_pydict() | |
273 | ||
274 | def test_stress_block_sizes(self): | |
275 | # Test a number of small block sizes to stress block stitching | |
276 | data_base, expected = make_random_json(num_cols=2, num_rows=100) | |
277 | read_options = ReadOptions() | |
278 | parse_options = ParseOptions() | |
279 | ||
280 | for data in [data_base, data_base.rstrip(b'\r\n')]: | |
281 | for newlines_in_values in [False, True]: | |
282 | parse_options.newlines_in_values = newlines_in_values | |
283 | for block_size in [22, 23, 37]: | |
284 | read_options.block_size = block_size | |
285 | table = self.read_bytes(data, read_options=read_options, | |
286 | parse_options=parse_options) | |
287 | assert table.schema == expected.schema | |
288 | if not table.equals(expected): | |
289 | # Better error output | |
290 | assert table.to_pydict() == expected.to_pydict() | |
291 | ||
292 | ||
293 | class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase): | |
294 | ||
295 | def read_json(self, *args, **kwargs): | |
296 | read_options = kwargs.setdefault('read_options', ReadOptions()) | |
297 | read_options.use_threads = False | |
298 | table = read_json(*args, **kwargs) | |
299 | table.validate(full=True) | |
300 | return table | |
301 | ||
302 | ||
303 | class TestParallelJSONRead(BaseTestJSONRead, unittest.TestCase): | |
304 | ||
305 | def read_json(self, *args, **kwargs): | |
306 | read_options = kwargs.setdefault('read_options', ReadOptions()) | |
307 | read_options.use_threads = True | |
308 | table = read_json(*args, **kwargs) | |
309 | table.validate(full=True) | |
310 | return table |