]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | import '../../jest-extensions'; | |
19 | ||
20 | import { | |
21 | Table, | |
22 | RecordBatchWriter, | |
23 | RecordBatchFileWriter, | |
24 | RecordBatchJSONWriter, | |
25 | RecordBatchStreamWriter, | |
26 | } from 'apache-arrow'; | |
27 | ||
28 | import * as fs from 'fs'; | |
29 | import { fs as memfs } from 'memfs'; | |
30 | import { Readable, PassThrough } from 'stream'; | |
31 | import randomatic from 'randomatic'; | |
32 | ||
33 | export abstract class ArrowIOTestHelper { | |
34 | ||
35 | constructor(public table: Table) {} | |
36 | ||
37 | public static file(table: Table) { return new ArrowFileIOTestHelper(table); } | |
38 | public static json(table: Table) { return new ArrowJsonIOTestHelper(table); } | |
39 | public static stream(table: Table) { return new ArrowStreamIOTestHelper(table); } | |
40 | ||
41 | protected abstract writer(table: Table): RecordBatchWriter; | |
42 | protected async filepath(table: Table): Promise<fs.PathLike> { | |
43 | const path = `/${randomatic('a0', 20)}.arrow`; | |
44 | const data = await this.writer(table).toUint8Array(); | |
45 | await memfs.promises.writeFile(path, data); | |
46 | return path; | |
47 | } | |
48 | ||
49 | buffer(testFn: (buffer: Uint8Array) => void | Promise<void>) { | |
50 | return async () => { | |
51 | expect.hasAssertions(); | |
52 | await testFn(await this.writer(this.table).toUint8Array()); | |
53 | }; | |
54 | } | |
55 | iterable(testFn: (iterable: Generator<Uint8Array>) => void | Promise<void>) { | |
56 | return async () => { | |
57 | expect.hasAssertions(); | |
58 | await testFn(chunkedIterable(await this.writer(this.table).toUint8Array())); | |
59 | }; | |
60 | } | |
61 | asyncIterable(testFn: (asyncIterable: AsyncGenerator<Uint8Array>) => void | Promise<void>) { | |
62 | return async () => { | |
63 | expect.hasAssertions(); | |
64 | await testFn(asyncChunkedIterable(await this.writer(this.table).toUint8Array())); | |
65 | }; | |
66 | } | |
67 | fsFileHandle(testFn: (handle: fs.promises.FileHandle) => void | Promise<void>) { | |
68 | return async () => { | |
69 | expect.hasAssertions(); | |
70 | const path = await this.filepath(this.table); | |
71 | await testFn(<any> await memfs.promises.open(path, 'r')); | |
72 | await memfs.promises.unlink(path); | |
73 | }; | |
74 | } | |
75 | fsReadableStream(testFn: (stream: fs.ReadStream) => void | Promise<void>) { | |
76 | return async () => { | |
77 | expect.hasAssertions(); | |
78 | const path = await this.filepath(this.table); | |
79 | await testFn(<any> memfs.createReadStream(path)); | |
80 | await memfs.promises.unlink(path); | |
81 | }; | |
82 | } | |
83 | nodeReadableStream(testFn: (stream: NodeJS.ReadableStream) => void | Promise<void>) { | |
84 | return async () => { | |
85 | expect.hasAssertions(); | |
86 | const sink = new PassThrough(); | |
87 | sink.end(await this.writer(this.table).toUint8Array()); | |
88 | await testFn(sink); | |
89 | }; | |
90 | } | |
91 | whatwgReadableStream(testFn: (stream: ReadableStream) => void | Promise<void>) { | |
92 | return async () => { | |
93 | expect.hasAssertions(); | |
94 | const path = await this.filepath(this.table); | |
95 | await testFn(nodeToDOMStream(memfs.createReadStream(path))); | |
96 | await memfs.promises.unlink(path); | |
97 | }; | |
98 | } | |
99 | whatwgReadableByteStream(testFn: (stream: ReadableStream) => void | Promise<void>) { | |
100 | return async () => { | |
101 | expect.hasAssertions(); | |
102 | const path = await this.filepath(this.table); | |
103 | await testFn(nodeToDOMStream(memfs.createReadStream(path), { type: 'bytes' })); | |
104 | await memfs.promises.unlink(path); | |
105 | }; | |
106 | } | |
107 | } | |
108 | ||
109 | class ArrowFileIOTestHelper extends ArrowIOTestHelper { | |
110 | constructor(table: Table) { super(table); } | |
111 | protected writer(table: Table) { | |
112 | return RecordBatchFileWriter.writeAll(table); | |
113 | } | |
114 | } | |
115 | ||
116 | class ArrowJsonIOTestHelper extends ArrowIOTestHelper { | |
117 | constructor(table: Table) { super(table); } | |
118 | protected writer(table: Table) { | |
119 | return RecordBatchJSONWriter.writeAll(table); | |
120 | } | |
121 | } | |
122 | ||
123 | class ArrowStreamIOTestHelper extends ArrowIOTestHelper { | |
124 | constructor(table: Table) { super(table); } | |
125 | protected writer(table: Table) { | |
126 | return RecordBatchStreamWriter.writeAll(table); | |
127 | } | |
128 | } | |
129 | ||
130 | export function* chunkedIterable(buffer: Uint8Array) { | |
131 | let offset = 0, size = 0; | |
132 | while (offset < buffer.byteLength) { | |
133 | size = yield buffer.subarray(offset, offset += | |
134 | (isNaN(+size) ? buffer.byteLength - offset : size)); | |
135 | } | |
136 | } | |
137 | ||
138 | export async function* asyncChunkedIterable(buffer: Uint8Array) { | |
139 | let offset = 0, size = 0; | |
140 | while (offset < buffer.byteLength) { | |
141 | size = yield buffer.subarray(offset, offset += | |
142 | (isNaN(+size) ? buffer.byteLength - offset : size)); | |
143 | } | |
144 | } | |
145 | ||
146 | export async function concatBuffersAsync(iterator: AsyncIterable<Uint8Array> | ReadableStream) { | |
147 | if (iterator instanceof ReadableStream) { | |
148 | iterator = readableDOMStreamToAsyncIterator(iterator); | |
149 | } | |
150 | let chunks = [], total = 0; | |
151 | for await (const chunk of iterator) { | |
152 | chunks.push(chunk); | |
153 | total += chunk.byteLength; | |
154 | } | |
155 | return chunks.reduce((x, buffer) => { | |
156 | x.buffer.set(buffer, x.offset); | |
157 | x.offset += buffer.byteLength; | |
158 | return x; | |
159 | }, { offset: 0, buffer: new Uint8Array(total) }).buffer; | |
160 | } | |
161 | ||
162 | export async function* readableDOMStreamToAsyncIterator<T>(stream: ReadableStream<T>) { | |
163 | // Get a lock on the stream | |
164 | const reader = stream.getReader(); | |
165 | try { | |
166 | while (true) { | |
167 | // Read from the stream | |
168 | const { done, value } = await reader.read(); | |
169 | // Exit if we're done | |
170 | if (done) { break; } | |
171 | // Else yield the chunk | |
172 | yield value as T; | |
173 | } | |
174 | } finally { | |
175 | try { stream.locked && reader.releaseLock(); } catch (e) {} | |
176 | } | |
177 | } | |
178 | ||
179 | export function nodeToDOMStream<T = any>(stream: NodeJS.ReadableStream, opts: any = {}) { | |
180 | stream = new Readable((stream as any)._readableState).wrap(stream); | |
181 | return new ReadableStream<T>({ | |
182 | ...opts, | |
183 | start(controller) { | |
184 | stream.pause(); | |
185 | stream.on('data', (chunk) => { | |
186 | controller.enqueue(chunk); | |
187 | stream.pause(); | |
188 | }); | |
189 | stream.on('end', () => controller.close()); | |
190 | stream.on('error', e => controller.error(e)); | |
191 | }, | |
192 | pull() { stream.resume(); }, | |
193 | cancel(reason) { | |
194 | stream.pause(); | |
195 | if (typeof (stream as any).cancel === 'function') { | |
196 | return (stream as any).cancel(reason); | |
197 | } else if (typeof (stream as any).destroy === 'function') { | |
198 | return (stream as any).destroy(reason); | |
199 | } | |
200 | } | |
201 | }); | |
202 | } |