]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/js/test/unit/ipc/helpers.ts
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / js / test / unit / ipc / helpers.ts
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18import '../../jest-extensions';
19
20import {
21 Table,
22 RecordBatchWriter,
23 RecordBatchFileWriter,
24 RecordBatchJSONWriter,
25 RecordBatchStreamWriter,
26} from 'apache-arrow';
27
28import * as fs from 'fs';
29import { fs as memfs } from 'memfs';
30import { Readable, PassThrough } from 'stream';
31import randomatic from 'randomatic';
32
33export abstract class ArrowIOTestHelper {
34
35 constructor(public table: Table) {}
36
37 public static file(table: Table) { return new ArrowFileIOTestHelper(table); }
38 public static json(table: Table) { return new ArrowJsonIOTestHelper(table); }
39 public static stream(table: Table) { return new ArrowStreamIOTestHelper(table); }
40
41 protected abstract writer(table: Table): RecordBatchWriter;
42 protected async filepath(table: Table): Promise<fs.PathLike> {
43 const path = `/${randomatic('a0', 20)}.arrow`;
44 const data = await this.writer(table).toUint8Array();
45 await memfs.promises.writeFile(path, data);
46 return path;
47 }
48
49 buffer(testFn: (buffer: Uint8Array) => void | Promise<void>) {
50 return async () => {
51 expect.hasAssertions();
52 await testFn(await this.writer(this.table).toUint8Array());
53 };
54 }
55 iterable(testFn: (iterable: Generator<Uint8Array>) => void | Promise<void>) {
56 return async () => {
57 expect.hasAssertions();
58 await testFn(chunkedIterable(await this.writer(this.table).toUint8Array()));
59 };
60 }
61 asyncIterable(testFn: (asyncIterable: AsyncGenerator<Uint8Array>) => void | Promise<void>) {
62 return async () => {
63 expect.hasAssertions();
64 await testFn(asyncChunkedIterable(await this.writer(this.table).toUint8Array()));
65 };
66 }
67 fsFileHandle(testFn: (handle: fs.promises.FileHandle) => void | Promise<void>) {
68 return async () => {
69 expect.hasAssertions();
70 const path = await this.filepath(this.table);
71 await testFn(<any> await memfs.promises.open(path, 'r'));
72 await memfs.promises.unlink(path);
73 };
74 }
75 fsReadableStream(testFn: (stream: fs.ReadStream) => void | Promise<void>) {
76 return async () => {
77 expect.hasAssertions();
78 const path = await this.filepath(this.table);
79 await testFn(<any> memfs.createReadStream(path));
80 await memfs.promises.unlink(path);
81 };
82 }
83 nodeReadableStream(testFn: (stream: NodeJS.ReadableStream) => void | Promise<void>) {
84 return async () => {
85 expect.hasAssertions();
86 const sink = new PassThrough();
87 sink.end(await this.writer(this.table).toUint8Array());
88 await testFn(sink);
89 };
90 }
91 whatwgReadableStream(testFn: (stream: ReadableStream) => void | Promise<void>) {
92 return async () => {
93 expect.hasAssertions();
94 const path = await this.filepath(this.table);
95 await testFn(nodeToDOMStream(memfs.createReadStream(path)));
96 await memfs.promises.unlink(path);
97 };
98 }
99 whatwgReadableByteStream(testFn: (stream: ReadableStream) => void | Promise<void>) {
100 return async () => {
101 expect.hasAssertions();
102 const path = await this.filepath(this.table);
103 await testFn(nodeToDOMStream(memfs.createReadStream(path), { type: 'bytes' }));
104 await memfs.promises.unlink(path);
105 };
106 }
107}
108
109class ArrowFileIOTestHelper extends ArrowIOTestHelper {
110 constructor(table: Table) { super(table); }
111 protected writer(table: Table) {
112 return RecordBatchFileWriter.writeAll(table);
113 }
114}
115
116class ArrowJsonIOTestHelper extends ArrowIOTestHelper {
117 constructor(table: Table) { super(table); }
118 protected writer(table: Table) {
119 return RecordBatchJSONWriter.writeAll(table);
120 }
121}
122
123class ArrowStreamIOTestHelper extends ArrowIOTestHelper {
124 constructor(table: Table) { super(table); }
125 protected writer(table: Table) {
126 return RecordBatchStreamWriter.writeAll(table);
127 }
128}
129
130export function* chunkedIterable(buffer: Uint8Array) {
131 let offset = 0, size = 0;
132 while (offset < buffer.byteLength) {
133 size = yield buffer.subarray(offset, offset +=
134 (isNaN(+size) ? buffer.byteLength - offset : size));
135 }
136}
137
138export async function* asyncChunkedIterable(buffer: Uint8Array) {
139 let offset = 0, size = 0;
140 while (offset < buffer.byteLength) {
141 size = yield buffer.subarray(offset, offset +=
142 (isNaN(+size) ? buffer.byteLength - offset : size));
143 }
144}
145
146export async function concatBuffersAsync(iterator: AsyncIterable<Uint8Array> | ReadableStream) {
147 if (iterator instanceof ReadableStream) {
148 iterator = readableDOMStreamToAsyncIterator(iterator);
149 }
150 let chunks = [], total = 0;
151 for await (const chunk of iterator) {
152 chunks.push(chunk);
153 total += chunk.byteLength;
154 }
155 return chunks.reduce((x, buffer) => {
156 x.buffer.set(buffer, x.offset);
157 x.offset += buffer.byteLength;
158 return x;
159 }, { offset: 0, buffer: new Uint8Array(total) }).buffer;
160}
161
162export async function* readableDOMStreamToAsyncIterator<T>(stream: ReadableStream<T>) {
163 // Get a lock on the stream
164 const reader = stream.getReader();
165 try {
166 while (true) {
167 // Read from the stream
168 const { done, value } = await reader.read();
169 // Exit if we're done
170 if (done) { break; }
171 // Else yield the chunk
172 yield value as T;
173 }
174 } finally {
175 try { stream.locked && reader.releaseLock(); } catch (e) {}
176 }
177}
178
179export function nodeToDOMStream<T = any>(stream: NodeJS.ReadableStream, opts: any = {}) {
180 stream = new Readable((stream as any)._readableState).wrap(stream);
181 return new ReadableStream<T>({
182 ...opts,
183 start(controller) {
184 stream.pause();
185 stream.on('data', (chunk) => {
186 controller.enqueue(chunk);
187 stream.pause();
188 });
189 stream.on('end', () => controller.close());
190 stream.on('error', e => controller.error(e));
191 },
192 pull() { stream.resume(); },
193 cancel(reason) {
194 stream.pause();
195 if (typeof (stream as any).cancel === 'function') {
196 return (stream as any).cancel(reason);
197 } else if (typeof (stream as any).destroy === 'function') {
198 return (stream as any).destroy(reason);
199 }
200 }
201 });
202}