]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | import { | |
19 | generateRandomTables, | |
20 | // generateDictionaryTables | |
21 | } from '../../../data/tables'; | |
22 | ||
23 | import { | |
24 | Table, | |
25 | RecordBatchReader, | |
26 | RecordBatchStreamWriter | |
27 | } from 'apache-arrow'; | |
28 | ||
29 | import { validateRecordBatchAsyncIterator } from '../validate'; | |
30 | import { ArrowIOTestHelper, readableDOMStreamToAsyncIterator } from '../helpers'; | |
31 | ||
32 | (() => { | |
33 | ||
34 | if (process.env.TEST_DOM_STREAMS !== 'true') { | |
35 | return test('not testing DOM streams because process.env.TEST_DOM_STREAMS !== "true"', () => {}); | |
36 | } | |
37 | ||
38 | for (const table of generateRandomTables([10, 20, 30])) { | |
39 | ||
40 | const file = ArrowIOTestHelper.file(table); | |
41 | const json = ArrowIOTestHelper.json(table); | |
42 | const stream = ArrowIOTestHelper.stream(table); | |
43 | const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; | |
44 | ||
45 | describe(`RecordBatchReader.throughDOM (${name})`, () => { | |
46 | describe('file', () => { | |
47 | test('ReadableStream', file.whatwgReadableStream(validate)); | |
48 | test('ReadableByteStream', file.whatwgReadableByteStream(validate)); | |
49 | }); | |
50 | describe('stream', () => { | |
51 | test('ReadableStream', stream.whatwgReadableStream(validate)); | |
52 | test('ReadableByteStream', stream.whatwgReadableByteStream(validate)); | |
53 | }); | |
54 | async function validate(source: ReadableStream) { | |
55 | const stream = source.pipeThrough(RecordBatchReader.throughDOM()); | |
56 | await validateRecordBatchAsyncIterator(3, readableDOMStreamToAsyncIterator(stream)); | |
57 | } | |
58 | }); | |
59 | ||
60 | describe(`toDOMStream (${name})`, () => { | |
61 | ||
62 | describe(`RecordBatchJSONReader`, () => { | |
63 | test('Uint8Array', json.buffer((source) => validate(JSON.parse(`${Buffer.from(source)}`)))); | |
64 | }); | |
65 | ||
66 | describe(`RecordBatchFileReader`, () => { | |
67 | test(`Uint8Array`, file.buffer(validate)); | |
68 | test(`Iterable`, file.iterable(validate)); | |
69 | test('AsyncIterable', file.asyncIterable(validate)); | |
70 | test('fs.FileHandle', file.fsFileHandle(validate)); | |
71 | test('fs.ReadStream', file.fsReadableStream(validate)); | |
72 | test('stream.Readable', file.nodeReadableStream(validate)); | |
73 | test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); | |
74 | test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); | |
75 | test('Promise<AsyncIterable>', file.asyncIterable((source) => validate(Promise.resolve(source)))); | |
76 | test('Promise<fs.FileHandle>', file.fsFileHandle((source) => validate(Promise.resolve(source)))); | |
77 | test('Promise<fs.ReadStream>', file.fsReadableStream((source) => validate(Promise.resolve(source)))); | |
78 | test('Promise<stream.Readable>', file.nodeReadableStream((source) => validate(Promise.resolve(source)))); | |
79 | test('Promise<ReadableStream>', file.whatwgReadableStream((source) => validate(Promise.resolve(source)))); | |
80 | test('Promise<ReadableByteStream>', file.whatwgReadableByteStream((source) => validate(Promise.resolve(source)))); | |
81 | }); | |
82 | ||
83 | describe(`RecordBatchStreamReader`, () => { | |
84 | test(`Uint8Array`, stream.buffer(validate)); | |
85 | test(`Iterable`, stream.iterable(validate)); | |
86 | test('AsyncIterable', stream.asyncIterable(validate)); | |
87 | test('fs.FileHandle', stream.fsFileHandle(validate)); | |
88 | test('fs.ReadStream', stream.fsReadableStream(validate)); | |
89 | test('stream.Readable', stream.nodeReadableStream(validate)); | |
90 | test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); | |
91 | test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); | |
92 | test('Promise<AsyncIterable>', stream.asyncIterable((source) => validate(Promise.resolve(source)))); | |
93 | test('Promise<fs.FileHandle>', stream.fsFileHandle((source) => validate(Promise.resolve(source)))); | |
94 | test('Promise<fs.ReadStream>', stream.fsReadableStream((source) => validate(Promise.resolve(source)))); | |
95 | test('Promise<stream.Readable>', stream.nodeReadableStream((source) => validate(Promise.resolve(source)))); | |
96 | test('Promise<ReadableStream>', stream.whatwgReadableStream((source) => validate(Promise.resolve(source)))); | |
97 | test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream((source) => validate(Promise.resolve(source)))); | |
98 | }); | |
99 | ||
100 | async function validate(source: any) { | |
101 | const reader: RecordBatchReader = await RecordBatchReader.from(source); | |
102 | const iterator = readableDOMStreamToAsyncIterator(reader.toDOMStream()); | |
103 | await validateRecordBatchAsyncIterator(3, iterator); | |
104 | } | |
105 | }); | |
106 | } | |
107 | ||
108 | it('readAll() should pipe to separate WhatWG WritableStreams', async () => { | |
109 | // @ts-ignore | |
110 | const { concatStream } = await import('@openpgp/web-stream-tools'); | |
111 | ||
112 | expect.hasAssertions(); | |
113 | ||
114 | const tables = [...generateRandomTables([10, 20, 30])]; | |
115 | ||
116 | const stream = concatStream(tables.map((table, i) => | |
117 | RecordBatchStreamWriter.writeAll(table).toDOMStream({ | |
118 | // Alternate between bytes mode and regular mode because code coverage | |
119 | type: i % 2 === 0 ? 'bytes' : undefined | |
120 | }) | |
121 | )) as ReadableStream<Uint8Array>; | |
122 | ||
123 | let tableIndex = -1; | |
124 | let reader: RecordBatchReader | undefined; | |
125 | ||
126 | for await (reader of RecordBatchReader.readAll(stream)) { | |
127 | ||
128 | validateStreamState(reader, stream, false); | |
129 | ||
130 | const output = reader | |
131 | .pipeThrough(RecordBatchStreamWriter.throughDOM()) | |
132 | .pipeThrough(new TransformStream()); | |
133 | ||
134 | validateStreamState(reader, output, false, false); | |
135 | ||
136 | const sourceTable = tables[++tableIndex]; | |
137 | const streamTable = await Table.from(output); | |
138 | expect(streamTable).toEqualTable(sourceTable); | |
139 | expect(output.locked).toBe(false); | |
140 | } | |
141 | ||
142 | expect(reader).toBeDefined(); | |
143 | validateStreamState(reader!, stream, true); | |
144 | expect(tableIndex).toBe(tables.length - 1); | |
145 | }); | |
146 | ||
147 | it('should not close the underlying WhatWG ReadableStream when reading multiple tables to completion', async () => { | |
148 | // @ts-ignore | |
149 | const { concatStream } = await import('@openpgp/web-stream-tools'); | |
150 | ||
151 | expect.hasAssertions(); | |
152 | ||
153 | const tables = [...generateRandomTables([10, 20, 30])]; | |
154 | ||
155 | const stream = concatStream(tables.map((table, i) => | |
156 | RecordBatchStreamWriter.writeAll(table).toDOMStream({ | |
157 | // Alternate between bytes mode and regular mode because code coverage | |
158 | type: i % 2 === 0 ? 'bytes' : undefined | |
159 | }) | |
160 | )) as ReadableStream<Uint8Array>; | |
161 | ||
162 | let tableIndex = -1; | |
163 | let reader = await RecordBatchReader.from(stream); | |
164 | ||
165 | validateStreamState(reader, stream, false); | |
166 | ||
167 | for await (reader of RecordBatchReader.readAll(reader)) { | |
168 | ||
169 | validateStreamState(reader, stream, false); | |
170 | ||
171 | const sourceTable = tables[++tableIndex]; | |
172 | const streamTable = await Table.from(reader); | |
173 | expect(streamTable).toEqualTable(sourceTable); | |
174 | } | |
175 | ||
176 | validateStreamState(reader, stream, true); | |
177 | expect(tableIndex).toBe(tables.length - 1); | |
178 | }); | |
179 | ||
180 | it('should close the underlying WhatWG ReadableStream when reading multiple tables and we break early', async () => { | |
181 | // @ts-ignore | |
182 | const { concatStream } = await import('@openpgp/web-stream-tools'); | |
183 | ||
184 | expect.hasAssertions(); | |
185 | ||
186 | const tables = [...generateRandomTables([10, 20, 30])]; | |
187 | ||
188 | const stream = concatStream(tables.map((table, i) => | |
189 | RecordBatchStreamWriter.writeAll(table).toDOMStream({ | |
190 | // Alternate between bytes mode and regular mode because code coverage | |
191 | type: i % 2 === 0 ? 'bytes' : undefined | |
192 | }) | |
193 | )) as ReadableStream<Uint8Array>; | |
194 | ||
195 | let tableIndex = -1; | |
196 | let reader = await RecordBatchReader.from(stream); | |
197 | ||
198 | validateStreamState(reader, stream, false); | |
199 | ||
200 | for await (reader of RecordBatchReader.readAll(reader)) { | |
201 | ||
202 | validateStreamState(reader, stream, false); | |
203 | ||
204 | let batchIndex = -1; | |
205 | const sourceTable = tables[++tableIndex]; | |
206 | const breakEarly = tableIndex === (tables.length / 2 | 0); | |
207 | ||
208 | for await (const streamBatch of reader) { | |
209 | expect(streamBatch).toEqualRecordBatch(sourceTable.chunks[++batchIndex]); | |
210 | if (breakEarly && batchIndex === 1) { break; } | |
211 | } | |
212 | if (breakEarly) { | |
213 | // the reader should stay open until we break from the outermost loop | |
214 | validateStreamState(reader, stream, false); | |
215 | break; | |
216 | } | |
217 | } | |
218 | ||
219 | validateStreamState(reader, stream, true); | |
220 | expect(tableIndex).toBe(tables.length / 2 | 0); | |
221 | }); | |
222 | })(); | |
223 | ||
224 | function validateStreamState(reader: RecordBatchReader, stream: ReadableStream, closed: boolean, locked = !closed) { | |
225 | expect(reader.closed).toBe(closed); | |
226 | expect(stream.locked).toBe(locked); | |
227 | } |