]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / js / test / unit / ipc / writer / stream-writer-tests.ts
diff --git a/ceph/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts b/ceph/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts
new file mode 100644 (file)
index 0000000..a83aa39
--- /dev/null
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+    generateRandomTables,
+    generateDictionaryTables
+} from '../../../data/tables';
+
+import * as generate from '../../../generate-test-data';
+import { validateRecordBatchIterator } from '../validate';
+import { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer';
+import { DictionaryVector, Dictionary, Uint32, Int32 } from 'apache-arrow';
+import { Table, Schema, Field, Chunked, Builder, RecordBatch, RecordBatchReader, RecordBatchStreamWriter } from 'apache-arrow';
+
+describe('RecordBatchStreamWriter', () => {
+
+    (() => {
+        const type = generate.sparseUnion(0, 0).vector.type;
+        const schema = new Schema([new Field('dictSparseUnion', type)]);
+        const table = generate.table([10, 20, 30], schema).table;
+        const testName = `[${table.schema.fields.join(', ')}]`;
+        testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
+        testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
+    })();
+
+    for (const table of generateRandomTables([10, 20, 30])) {
+        const testName = `[${table.schema.fields.join(', ')}]`;
+        testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
+        testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
+    }
+
+    for (const table of generateDictionaryTables([10, 20, 30])) {
+        const testName = `${table.schema.fields[0]}`;
+        testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
+        testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
+    }
+
+    it(`should write multiple tables to the same output stream`, async () => {
+        const tables = [] as Table[];
+        const writer = new RecordBatchStreamWriter({ autoDestroy: false });
+        const validate = (async () => {
+            for await (const reader of RecordBatchReader.readAll(writer)) {
+                const sourceTable = tables.shift()!;
+                const streamTable = await Table.from(reader);
+                expect(streamTable).toEqualTable(sourceTable);
+            }
+        })();
+        for (const table of generateRandomTables([10, 20, 30])) {
+            tables.push(table);
+            await writer.writeAll((async function*() {
+                for (const chunk of table.chunks) {
+                    yield chunk; // insert some asynchrony
+                    await new Promise((r) => setTimeout(r, 1));
+                }
+            }()));
+        }
+        writer.close();
+        await validate;
+    });
+
+    it('should write delta dictionary batches', async () => {
+
+        const name = 'dictionary_encoded_uint32';
+        const chunks: DictionaryVector<Uint32, Int32>[] = [];
+        const {
+            vector: sourceVector, values: sourceValues,
+        } = generate.dictionary(1000, 20, new Uint32(), new Int32());
+
+        const writer = RecordBatchStreamWriter.writeAll((function* () {
+            const transform = Builder.throughIterable({
+                type: sourceVector.type, nullValues: [null],
+                queueingStrategy: 'count', highWaterMark: 50,
+            });
+            for (const chunk of transform(sourceValues())) {
+                chunks.push(chunk);
+                yield RecordBatch.new({ [name]: chunk });
+            }
+        })());
+
+        expect(Chunked.concat(chunks)).toEqualVector(sourceVector);
+
+        type T = { [name]: Dictionary<Uint32, Int32> };
+        const sourceTable = Table.new({ [name]: sourceVector });
+        const resultTable = await Table.from<T>(writer.toUint8Array());
+
+        const { dictionary } = resultTable.getColumn(name);
+
+        expect(resultTable).toEqualTable(sourceTable);
+        expect((dictionary as Chunked)).toBeInstanceOf(Chunked);
+        expect((dictionary as Chunked).chunks).toHaveLength(20);
+    });
+});
+
+function testStreamWriter(table: Table, name: string, options: RecordBatchStreamWriterOptions) {
+    describe(`should write the Arrow IPC stream format (${name})`, () => {
+        test(`Table`, validateTable.bind(0, table, options));
+    });
+}
+
+async function validateTable(source: Table, options: RecordBatchStreamWriterOptions) {
+    const writer = RecordBatchStreamWriter.writeAll(source, options);
+    const result = await Table.from(writer.toUint8Array());
+    validateRecordBatchIterator(3, source.chunks);
+    expect(result).toEqualTable(source);
+}