]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / js / test / unit / ipc / reader / streams-dom-tests.ts
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18import {
19 generateRandomTables,
20 // generateDictionaryTables
21} from '../../../data/tables';
22
23import {
24 Table,
25 RecordBatchReader,
26 RecordBatchStreamWriter
27} from 'apache-arrow';
28
29import { validateRecordBatchAsyncIterator } from '../validate';
30import { ArrowIOTestHelper, readableDOMStreamToAsyncIterator } from '../helpers';
31
32(() => {
33
34 if (process.env.TEST_DOM_STREAMS !== 'true') {
35 return test('not testing DOM streams because process.env.TEST_DOM_STREAMS !== "true"', () => {});
36 }
37
38 for (const table of generateRandomTables([10, 20, 30])) {
39
40 const file = ArrowIOTestHelper.file(table);
41 const json = ArrowIOTestHelper.json(table);
42 const stream = ArrowIOTestHelper.stream(table);
43 const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
44
45 describe(`RecordBatchReader.throughDOM (${name})`, () => {
46 describe('file', () => {
47 test('ReadableStream', file.whatwgReadableStream(validate));
48 test('ReadableByteStream', file.whatwgReadableByteStream(validate));
49 });
50 describe('stream', () => {
51 test('ReadableStream', stream.whatwgReadableStream(validate));
52 test('ReadableByteStream', stream.whatwgReadableByteStream(validate));
53 });
54 async function validate(source: ReadableStream) {
55 const stream = source.pipeThrough(RecordBatchReader.throughDOM());
56 await validateRecordBatchAsyncIterator(3, readableDOMStreamToAsyncIterator(stream));
57 }
58 });
59
60 describe(`toDOMStream (${name})`, () => {
61
62 describe(`RecordBatchJSONReader`, () => {
63 test('Uint8Array', json.buffer((source) => validate(JSON.parse(`${Buffer.from(source)}`))));
64 });
65
66 describe(`RecordBatchFileReader`, () => {
67 test(`Uint8Array`, file.buffer(validate));
68 test(`Iterable`, file.iterable(validate));
69 test('AsyncIterable', file.asyncIterable(validate));
70 test('fs.FileHandle', file.fsFileHandle(validate));
71 test('fs.ReadStream', file.fsReadableStream(validate));
72 test('stream.Readable', file.nodeReadableStream(validate));
73 test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
74 test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
75 test('Promise<AsyncIterable>', file.asyncIterable((source) => validate(Promise.resolve(source))));
76 test('Promise<fs.FileHandle>', file.fsFileHandle((source) => validate(Promise.resolve(source))));
77 test('Promise<fs.ReadStream>', file.fsReadableStream((source) => validate(Promise.resolve(source))));
78 test('Promise<stream.Readable>', file.nodeReadableStream((source) => validate(Promise.resolve(source))));
79 test('Promise<ReadableStream>', file.whatwgReadableStream((source) => validate(Promise.resolve(source))));
80 test('Promise<ReadableByteStream>', file.whatwgReadableByteStream((source) => validate(Promise.resolve(source))));
81 });
82
83 describe(`RecordBatchStreamReader`, () => {
84 test(`Uint8Array`, stream.buffer(validate));
85 test(`Iterable`, stream.iterable(validate));
86 test('AsyncIterable', stream.asyncIterable(validate));
87 test('fs.FileHandle', stream.fsFileHandle(validate));
88 test('fs.ReadStream', stream.fsReadableStream(validate));
89 test('stream.Readable', stream.nodeReadableStream(validate));
90 test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
91 test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
92 test('Promise<AsyncIterable>', stream.asyncIterable((source) => validate(Promise.resolve(source))));
93 test('Promise<fs.FileHandle>', stream.fsFileHandle((source) => validate(Promise.resolve(source))));
94 test('Promise<fs.ReadStream>', stream.fsReadableStream((source) => validate(Promise.resolve(source))));
95 test('Promise<stream.Readable>', stream.nodeReadableStream((source) => validate(Promise.resolve(source))));
96 test('Promise<ReadableStream>', stream.whatwgReadableStream((source) => validate(Promise.resolve(source))));
97 test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream((source) => validate(Promise.resolve(source))));
98 });
99
100 async function validate(source: any) {
101 const reader: RecordBatchReader = await RecordBatchReader.from(source);
102 const iterator = readableDOMStreamToAsyncIterator(reader.toDOMStream());
103 await validateRecordBatchAsyncIterator(3, iterator);
104 }
105 });
106 }
107
108 it('readAll() should pipe to separate WhatWG WritableStreams', async () => {
109 // @ts-ignore
110 const { concatStream } = await import('@openpgp/web-stream-tools');
111
112 expect.hasAssertions();
113
114 const tables = [...generateRandomTables([10, 20, 30])];
115
116 const stream = concatStream(tables.map((table, i) =>
117 RecordBatchStreamWriter.writeAll(table).toDOMStream({
118 // Alternate between bytes mode and regular mode because code coverage
119 type: i % 2 === 0 ? 'bytes' : undefined
120 })
121 )) as ReadableStream<Uint8Array>;
122
123 let tableIndex = -1;
124 let reader: RecordBatchReader | undefined;
125
126 for await (reader of RecordBatchReader.readAll(stream)) {
127
128 validateStreamState(reader, stream, false);
129
130 const output = reader
131 .pipeThrough(RecordBatchStreamWriter.throughDOM())
132 .pipeThrough(new TransformStream());
133
134 validateStreamState(reader, output, false, false);
135
136 const sourceTable = tables[++tableIndex];
137 const streamTable = await Table.from(output);
138 expect(streamTable).toEqualTable(sourceTable);
139 expect(output.locked).toBe(false);
140 }
141
142 expect(reader).toBeDefined();
143 validateStreamState(reader!, stream, true);
144 expect(tableIndex).toBe(tables.length - 1);
145 });
146
147 it('should not close the underlying WhatWG ReadableStream when reading multiple tables to completion', async () => {
148 // @ts-ignore
149 const { concatStream } = await import('@openpgp/web-stream-tools');
150
151 expect.hasAssertions();
152
153 const tables = [...generateRandomTables([10, 20, 30])];
154
155 const stream = concatStream(tables.map((table, i) =>
156 RecordBatchStreamWriter.writeAll(table).toDOMStream({
157 // Alternate between bytes mode and regular mode because code coverage
158 type: i % 2 === 0 ? 'bytes' : undefined
159 })
160 )) as ReadableStream<Uint8Array>;
161
162 let tableIndex = -1;
163 let reader = await RecordBatchReader.from(stream);
164
165 validateStreamState(reader, stream, false);
166
167 for await (reader of RecordBatchReader.readAll(reader)) {
168
169 validateStreamState(reader, stream, false);
170
171 const sourceTable = tables[++tableIndex];
172 const streamTable = await Table.from(reader);
173 expect(streamTable).toEqualTable(sourceTable);
174 }
175
176 validateStreamState(reader, stream, true);
177 expect(tableIndex).toBe(tables.length - 1);
178 });
179
180 it('should close the underlying WhatWG ReadableStream when reading multiple tables and we break early', async () => {
181 // @ts-ignore
182 const { concatStream } = await import('@openpgp/web-stream-tools');
183
184 expect.hasAssertions();
185
186 const tables = [...generateRandomTables([10, 20, 30])];
187
188 const stream = concatStream(tables.map((table, i) =>
189 RecordBatchStreamWriter.writeAll(table).toDOMStream({
190 // Alternate between bytes mode and regular mode because code coverage
191 type: i % 2 === 0 ? 'bytes' : undefined
192 })
193 )) as ReadableStream<Uint8Array>;
194
195 let tableIndex = -1;
196 let reader = await RecordBatchReader.from(stream);
197
198 validateStreamState(reader, stream, false);
199
200 for await (reader of RecordBatchReader.readAll(reader)) {
201
202 validateStreamState(reader, stream, false);
203
204 let batchIndex = -1;
205 const sourceTable = tables[++tableIndex];
206 const breakEarly = tableIndex === (tables.length / 2 | 0);
207
208 for await (const streamBatch of reader) {
209 expect(streamBatch).toEqualRecordBatch(sourceTable.chunks[++batchIndex]);
210 if (breakEarly && batchIndex === 1) { break; }
211 }
212 if (breakEarly) {
213 // the reader should stay open until we break from the outermost loop
214 validateStreamState(reader, stream, false);
215 break;
216 }
217 }
218
219 validateStreamState(reader, stream, true);
220 expect(tableIndex).toBe(tables.length / 2 | 0);
221 });
222})();
223
224function validateStreamState(reader: RecordBatchReader, stream: ReadableStream, closed: boolean, locked = !closed) {
225 expect(reader.closed).toBe(closed);
226 expect(stream.locked).toBe(locked);
227}