]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | import { Vector } from './vector'; | |
19 | import { BufferType } from './enum'; | |
20 | import { Data, Buffers } from './data'; | |
21 | import { createIsValidFunction } from './builder/valid'; | |
22 | import { BuilderType as B, VectorType as V} from './interfaces'; | |
23 | import { BufferBuilder, BitmapBufferBuilder, DataBufferBuilder, OffsetsBufferBuilder } from './builder/buffer'; | |
24 | import { | |
25 | DataType, strideForType, | |
26 | Float, Int, Decimal, FixedSizeBinary, | |
27 | Date_, Time, Timestamp, Interval, | |
28 | Utf8, Binary, List, Map_ | |
29 | } from './type'; | |
30 | ||
31 | /** | |
32 | * A set of options required to create a `Builder` instance for a given `DataType`. | |
33 | * @see {@link Builder} | |
34 | */ | |
35 | export interface BuilderOptions<T extends DataType = any, TNull = any> { | |
36 | type: T; | |
37 | nullValues?: TNull[] | ReadonlyArray<TNull> | null; | |
38 | children?: { [key: string]: BuilderOptions } | BuilderOptions[]; | |
39 | } | |
40 | ||
41 | /** | |
42 | * A set of options to create an Iterable or AsyncIterable `Builder` transform function. | |
43 | * @see {@link Builder.throughIterable} | |
44 | * @see {@link Builder.throughAsyncIterable} | |
45 | */ | |
46 | ||
47 | export interface IterableBuilderOptions<T extends DataType = any, TNull = any> extends BuilderOptions<T, TNull> { | |
48 | highWaterMark?: number; | |
49 | queueingStrategy?: 'bytes' | 'count'; | |
50 | dictionaryHashFunction?: (value: any) => string | number; | |
51 | valueToChildTypeId?: (builder: Builder<T, TNull>, value: any, offset: number) => number; | |
52 | } | |
53 | ||
54 | /** | |
55 | * An abstract base class for types that construct Arrow Vectors from arbitrary JavaScript values. | |
56 | * | |
57 | * A `Builder` is responsible for writing arbitrary JavaScript values | |
58 | * to ArrayBuffers and/or child Builders according to the Arrow specification | |
59 | * for each DataType, creating or resizing the underlying ArrayBuffers as necessary. | |
60 | * | |
61 | * The `Builder` for each Arrow `DataType` handles converting and appending | |
62 | * values for a given `DataType`. The high-level {@link Builder.new `Builder.new()`} convenience | |
63 | * method creates the specific `Builder` subclass for the supplied `DataType`. | |
64 | * | |
65 | * Once created, `Builder` instances support both appending values to the end | |
66 | * of the `Builder`, and random-access writes to specific indices | |
67 | * (`Builder.prototype.append(value)` is a convenience method for | |
68 | * `builder.set(builder.length, value)`). Appending or setting values beyond the | |
69 | * Builder's current length may cause the builder to grow its underlying buffers | |
70 | * or child Builders (if applicable) to accommodate the new values. | |
71 | * | |
72 | * After enough values have been written to a `Builder`, `Builder.prototype.flush()` | |
73 | * will commit the values to the underlying ArrayBuffers (or child Builders). The | |
74 | * internal Builder state will be reset, and an instance of `Data<T>` is returned. | |
75 | * Alternatively, `Builder.prototype.toVector()` will flush the `Builder` and return | |
76 | * an instance of `Vector<T>` instead. | |
77 | * | |
78 | * When there are no more values to write, use `Builder.prototype.finish()` to | |
79 | * finalize the `Builder`. This does not reset the internal state, so it is | |
80 | * necessary to call `Builder.prototype.flush()` or `toVector()` one last time | |
81 | * if there are still values queued to be flushed. | |
82 | * | |
83 | * Note: calling `Builder.prototype.finish()` is required when using a `DictionaryBuilder`, | |
84 | * because this is when it flushes the values that have been enqueued in its internal | |
85 | * dictionary's `Builder`, and creates the `dictionaryVector` for the `Dictionary` `DataType`. | |
86 | * | |
87 | * ```ts | |
88 | * import { Builder, Utf8 } from 'apache-arrow'; | |
89 | * | |
90 | * const utf8Builder = Builder.new({ | |
91 | * type: new Utf8(), | |
92 | * nullValues: [null, 'n/a'] | |
93 | * }); | |
94 | * | |
95 | * utf8Builder | |
96 | * .append('hello') | |
97 | * .append('n/a') | |
98 | * .append('world') | |
99 | * .append(null); | |
100 | * | |
101 | * const utf8Vector = utf8Builder.finish().toVector(); | |
102 | * | |
103 | * console.log(utf8Vector.toJSON()); | |
104 | * // > ["hello", null, "world", null] | |
105 | * ``` | |
106 | * | |
107 | * @typeparam T The `DataType` of this `Builder`. | |
108 | * @typeparam TNull The type(s) of values which will be considered null-value sentinels. | |
109 | */ | |
110 | export abstract class Builder<T extends DataType = any, TNull = any> { | |
111 | ||
112 | /** | |
113 | * Create a `Builder` instance based on the `type` property of the supplied `options` object. | |
114 | * @param {BuilderOptions<T, TNull>} options An object with a required `DataType` instance | |
115 | * and other optional parameters to be passed to the `Builder` subclass for the given `type`. | |
116 | * | |
117 | * @typeparam T The `DataType` of the `Builder` to create. | |
118 | * @typeparam TNull The type(s) of values which will be considered null-value sentinels. | |
119 | * @nocollapse | |
120 | */ | |
121 | // @ts-ignore | |
122 | public static new<T extends DataType = any, TNull = any>(options: BuilderOptions<T, TNull>): B<T, TNull> {} | |
123 | ||
124 | /** @nocollapse */ | |
125 | // @ts-ignore | |
126 | public static throughNode<T extends DataType = any, TNull = any>(options: import('./io/node/builder').BuilderDuplexOptions<T, TNull>): import('stream').Duplex { | |
127 | throw new Error(`"throughNode" not available in this environment`); | |
128 | } | |
129 | /** @nocollapse */ | |
130 | // @ts-ignore | |
131 | public static throughDOM<T extends DataType = any, TNull = any>(options: import('./io/whatwg/builder').BuilderTransformOptions<T, TNull>): import('./io/whatwg/builder').BuilderTransform<T, TNull> { | |
132 | throw new Error(`"throughDOM" not available in this environment`); | |
133 | } | |
134 | ||
135 | /** | |
136 | * Transform a synchronous `Iterable` of arbitrary JavaScript values into a | |
137 | * sequence of Arrow Vector<T> following the chunking semantics defined in | |
138 | * the supplied `options` argument. | |
139 | * | |
140 | * This function returns a function that accepts an `Iterable` of values to | |
141 | * transform. When called, this function returns an Iterator of `Vector<T>`. | |
142 | * | |
143 | * The resulting `Iterator<Vector<T>>` yields Vectors based on the | |
144 | * `queueingStrategy` and `highWaterMark` specified in the `options` argument. | |
145 | * | |
146 | * * If `queueingStrategy` is `"count"` (or omitted), The `Iterator<Vector<T>>` | |
147 | * will flush the underlying `Builder` (and yield a new `Vector<T>`) once the | |
148 | * Builder's `length` reaches or exceeds the supplied `highWaterMark`. | |
149 | * * If `queueingStrategy` is `"bytes"`, the `Iterator<Vector<T>>` will flush | |
150 | * the underlying `Builder` (and yield a new `Vector<T>`) once its `byteLength` | |
151 | * reaches or exceeds the supplied `highWaterMark`. | |
152 | * | |
153 | * @param {IterableBuilderOptions<T, TNull>} options An object of properties which determine the `Builder` to create and the chunking semantics to use. | |
154 | * @returns A function which accepts a JavaScript `Iterable` of values to | |
155 | * write, and returns an `Iterator` that yields Vectors according | |
156 | * to the chunking semantics defined in the `options` argument. | |
157 | * @nocollapse | |
158 | */ | |
159 | public static throughIterable<T extends DataType = any, TNull = any>(options: IterableBuilderOptions<T, TNull>) { | |
160 | return throughIterable(options); | |
161 | } | |
162 | ||
163 | /** | |
164 | * Transform an `AsyncIterable` of arbitrary JavaScript values into a | |
165 | * sequence of Arrow Vector<T> following the chunking semantics defined in | |
166 | * the supplied `options` argument. | |
167 | * | |
168 | * This function returns a function that accepts an `AsyncIterable` of values to | |
169 | * transform. When called, this function returns an AsyncIterator of `Vector<T>`. | |
170 | * | |
171 | * The resulting `AsyncIterator<Vector<T>>` yields Vectors based on the | |
172 | * `queueingStrategy` and `highWaterMark` specified in the `options` argument. | |
173 | * | |
174 | * * If `queueingStrategy` is `"count"` (or omitted), The `AsyncIterator<Vector<T>>` | |
175 | * will flush the underlying `Builder` (and yield a new `Vector<T>`) once the | |
176 | * Builder's `length` reaches or exceeds the supplied `highWaterMark`. | |
177 | * * If `queueingStrategy` is `"bytes"`, the `AsyncIterator<Vector<T>>` will flush | |
178 | * the underlying `Builder` (and yield a new `Vector<T>`) once its `byteLength` | |
179 | * reaches or exceeds the supplied `highWaterMark`. | |
180 | * | |
181 | * @param {IterableBuilderOptions<T, TNull>} options An object of properties which determine the `Builder` to create and the chunking semantics to use. | |
182 | * @returns A function which accepts a JavaScript `AsyncIterable` of values | |
183 | * to write, and returns an `AsyncIterator` that yields Vectors | |
184 | * according to the chunking semantics defined in the `options` | |
185 | * argument. | |
186 | * @nocollapse | |
187 | */ | |
188 | public static throughAsyncIterable<T extends DataType = any, TNull = any>(options: IterableBuilderOptions<T, TNull>) { | |
189 | return throughAsyncIterable(options); | |
190 | } | |
191 | ||
192 | /** | |
193 | * Construct a builder with the given Arrow DataType with optional null values, | |
194 | * which will be interpreted as "null" when set or appended to the `Builder`. | |
195 | * @param {{ type: T, nullValues?: any[] }} options A `BuilderOptions` object used to create this `Builder`. | |
196 | */ | |
197 | constructor({ 'type': type, 'nullValues': nulls }: BuilderOptions<T, TNull>) { | |
198 | this.type = type; | |
199 | this.children = []; | |
200 | this.nullValues = nulls; | |
201 | this.stride = strideForType(type); | |
202 | this._nulls = new BitmapBufferBuilder(); | |
203 | if (nulls && nulls.length > 0) { | |
204 | this._isValid = createIsValidFunction(nulls); | |
205 | } | |
206 | } | |
207 | ||
208 | /** | |
209 | * The Builder's `DataType` instance. | |
210 | * @readonly | |
211 | */ | |
212 | public type: T; | |
213 | /** | |
214 | * The number of values written to the `Builder` that haven't been flushed yet. | |
215 | * @readonly | |
216 | */ | |
217 | public length = 0; | |
218 | /** | |
219 | * A boolean indicating whether `Builder.prototype.finish()` has been called on this `Builder`. | |
220 | * @readonly | |
221 | */ | |
222 | public finished = false; | |
223 | /** | |
224 | * The number of elements in the underlying values TypedArray that | |
225 | * represent a single logical element, determined by this Builder's | |
226 | * `DataType`. This is 1 for most types, but is larger when the `DataType` | |
227 | * is `Int64`, `Uint64`, `Decimal`, `DateMillisecond`, certain variants of | |
228 | * `Interval`, `Time`, or `Timestamp`, `FixedSizeBinary`, and `FixedSizeList`. | |
229 | * @readonly | |
230 | */ | |
231 | public readonly stride: number; | |
232 | public readonly children: Builder[]; | |
233 | /** | |
234 | * The list of null-value sentinels for this `Builder`. When one of these values | |
235 | * is written to the `Builder` (either via `Builder.prototype.set()` or `Builder.prototype.append()`), | |
236 | * a 1-bit is written to this Builder's underlying null BitmapBufferBuilder. | |
237 | * @readonly | |
238 | */ | |
239 | public readonly nullValues?: TNull[] | ReadonlyArray<TNull> | null; | |
240 | ||
241 | /** | |
242 | * Flush the `Builder` and return a `Vector<T>`. | |
243 | * @returns {Vector<T>} A `Vector<T>` of the flushed values. | |
244 | */ | |
245 | public toVector() { return Vector.new(this.flush()); } | |
246 | ||
247 | public get ArrayType() { return this.type.ArrayType; } | |
248 | public get nullCount() { return this._nulls.numInvalid; } | |
249 | public get numChildren() { return this.children.length; } | |
250 | ||
251 | /** | |
252 | * @returns The aggregate length (in bytes) of the values that have been written. | |
253 | */ | |
254 | public get byteLength(): number { | |
255 | let size = 0; | |
256 | this._offsets && (size += this._offsets.byteLength); | |
257 | this._values && (size += this._values.byteLength); | |
258 | this._nulls && (size += this._nulls.byteLength); | |
259 | this._typeIds && (size += this._typeIds.byteLength); | |
260 | return this.children.reduce((size, child) => size + child.byteLength, size); | |
261 | } | |
262 | ||
263 | /** | |
264 | * @returns The aggregate number of rows that have been reserved to write new values. | |
265 | */ | |
266 | public get reservedLength(): number { | |
267 | return this._nulls.reservedLength; | |
268 | } | |
269 | ||
270 | /** | |
271 | * @returns The aggregate length (in bytes) that has been reserved to write new values. | |
272 | */ | |
273 | public get reservedByteLength(): number { | |
274 | let size = 0; | |
275 | this._offsets && (size += this._offsets.reservedByteLength); | |
276 | this._values && (size += this._values.reservedByteLength); | |
277 | this._nulls && (size += this._nulls.reservedByteLength); | |
278 | this._typeIds && (size += this._typeIds.reservedByteLength); | |
279 | return this.children.reduce((size, child) => size + child.reservedByteLength, size); | |
280 | } | |
281 | ||
282 | protected _offsets!: DataBufferBuilder<Int32Array>; | |
283 | public get valueOffsets() { return this._offsets ? this._offsets.buffer : null; } | |
284 | ||
285 | protected _values!: BufferBuilder<T['TArray'], any>; | |
286 | public get values() { return this._values ? this._values.buffer : null; } | |
287 | ||
288 | protected _nulls: BitmapBufferBuilder; | |
289 | public get nullBitmap() { return this._nulls ? this._nulls.buffer : null; } | |
290 | ||
291 | protected _typeIds!: DataBufferBuilder<Int8Array>; | |
292 | public get typeIds() { return this._typeIds ? this._typeIds.buffer : null; } | |
293 | ||
294 | protected _isValid!: (value: T['TValue'] | TNull) => boolean; | |
295 | protected _setValue!: (inst: Builder<T>, index: number, value: T['TValue']) => void; | |
296 | ||
297 | /** | |
298 | * Appends a value (or null) to this `Builder`. | |
299 | * This is equivalent to `builder.set(builder.length, value)`. | |
300 | * @param {T['TValue'] | TNull } value The value to append. | |
301 | */ | |
302 | public append(value: T['TValue'] | TNull) { return this.set(this.length, value); } | |
303 | ||
304 | /** | |
305 | * Validates whether a value is valid (true), or null (false) | |
306 | * @param {T['TValue'] | TNull } value The value to compare against null the value representations | |
307 | */ | |
308 | public isValid(value: T['TValue'] | TNull): boolean { return this._isValid(value); } | |
309 | ||
310 | /** | |
311 | * Write a value (or null-value sentinel) at the supplied index. | |
312 | * If the value matches one of the null-value representations, a 1-bit is | |
313 | * written to the null `BitmapBufferBuilder`. Otherwise, a 0 is written to | |
314 | * the null `BitmapBufferBuilder`, and the value is passed to | |
315 | * `Builder.prototype.setValue()`. | |
316 | * @param {number} index The index of the value to write. | |
317 | * @param {T['TValue'] | TNull } value The value to write at the supplied index. | |
318 | * @returns {this} The updated `Builder` instance. | |
319 | */ | |
320 | public set(index: number, value: T['TValue'] | TNull) { | |
321 | if (this.setValid(index, this.isValid(value))) { | |
322 | this.setValue(index, value); | |
323 | } | |
324 | return this; | |
325 | } | |
326 | ||
327 | /** | |
328 | * Write a value to the underlying buffers at the supplied index, bypassing | |
329 | * the null-value check. This is a low-level method that | |
330 | * @param {number} index | |
331 | * @param {T['TValue'] | TNull } value | |
332 | */ | |
333 | public setValue(index: number, value: T['TValue']) { this._setValue(this, index, value); } | |
334 | public setValid(index: number, valid: boolean) { | |
335 | this.length = this._nulls.set(index, +valid).length; | |
336 | return valid; | |
337 | } | |
338 | ||
339 | // @ts-ignore | |
340 | public addChild(child: Builder, name = `${this.numChildren}`) { | |
341 | throw new Error(`Cannot append children to non-nested type "${this.type}"`); | |
342 | } | |
343 | ||
344 | /** | |
345 | * Retrieve the child `Builder` at the supplied `index`, or null if no child | |
346 | * exists at that index. | |
347 | * @param {number} index The index of the child `Builder` to retrieve. | |
348 | * @returns {Builder | null} The child Builder at the supplied index or null. | |
349 | */ | |
350 | public getChildAt<R extends DataType = any>(index: number): Builder<R> | null { | |
351 | return this.children[index] || null; | |
352 | } | |
353 | ||
354 | /** | |
355 | * Commit all the values that have been written to their underlying | |
356 | * ArrayBuffers, including any child Builders if applicable, and reset | |
357 | * the internal `Builder` state. | |
358 | * @returns A `Data<T>` of the buffers and childData representing the values written. | |
359 | */ | |
360 | public flush() { | |
361 | ||
362 | const buffers: any = []; | |
363 | const values = this._values; | |
364 | const offsets = this._offsets; | |
365 | const typeIds = this._typeIds; | |
366 | const { length, nullCount } = this; | |
367 | ||
368 | if (typeIds) { /* Unions */ | |
369 | buffers[BufferType.TYPE] = typeIds.flush(length); | |
370 | // DenseUnions | |
371 | offsets && (buffers[BufferType.OFFSET] = offsets.flush(length)); | |
372 | } else if (offsets) { /* Variable-width primitives (Binary, Utf8) and Lists */ | |
373 | // Binary, Utf8 | |
374 | values && (buffers[BufferType.DATA] = values.flush(offsets.last())); | |
375 | buffers[BufferType.OFFSET] = offsets.flush(length); | |
376 | } else if (values) { /* Fixed-width primitives (Int, Float, Decimal, Time, Timestamp, and Interval) */ | |
377 | buffers[BufferType.DATA] = values.flush(length); | |
378 | } | |
379 | ||
380 | nullCount > 0 && (buffers[BufferType.VALIDITY] = this._nulls.flush(length)); | |
381 | ||
382 | const data = Data.new<T>( | |
383 | this.type, 0, length, nullCount, buffers as Buffers<T>, | |
384 | this.children.map((child) => child.flush())) as Data<T>; | |
385 | ||
386 | this.clear(); | |
387 | ||
388 | return data; | |
389 | } | |
390 | ||
391 | /** | |
392 | * Finalize this `Builder`, and child builders if applicable. | |
393 | * @returns {this} The finalized `Builder` instance. | |
394 | */ | |
395 | public finish() { | |
396 | this.finished = true; | |
397 | this.children.forEach((child) => child.finish()); | |
398 | return this; | |
399 | } | |
400 | ||
401 | /** | |
402 | * Clear this Builder's internal state, including child Builders if applicable, and reset the length to 0. | |
403 | * @returns {this} The cleared `Builder` instance. | |
404 | */ | |
405 | public clear() { | |
406 | this.length = 0; | |
407 | this._offsets && (this._offsets.clear()); | |
408 | this._values && (this._values.clear()); | |
409 | this._nulls && (this._nulls.clear()); | |
410 | this._typeIds && (this._typeIds.clear()); | |
411 | this.children.forEach((child) => child.clear()); | |
412 | return this; | |
413 | } | |
414 | } | |
415 | ||
416 | (Builder.prototype as any).length = 1; | |
417 | (Builder.prototype as any).stride = 1; | |
418 | (Builder.prototype as any).children = null; | |
419 | (Builder.prototype as any).finished = false; | |
420 | (Builder.prototype as any).nullValues = null; | |
421 | (Builder.prototype as any)._isValid = () => true; | |
422 | ||
423 | /** @ignore */ | |
424 | export abstract class FixedWidthBuilder<T extends Int | Float | FixedSizeBinary | Date_ | Timestamp | Time | Decimal | Interval = any, TNull = any> extends Builder<T, TNull> { | |
425 | constructor(opts: BuilderOptions<T, TNull>) { | |
426 | super(opts); | |
427 | this._values = new DataBufferBuilder(new this.ArrayType(0), this.stride); | |
428 | } | |
429 | public setValue(index: number, value: T['TValue']) { | |
430 | const values = this._values; | |
431 | values.reserve(index - values.length + 1); | |
432 | return super.setValue(index, value); | |
433 | } | |
434 | } | |
435 | ||
436 | /** @ignore */ | |
437 | export abstract class VariableWidthBuilder<T extends Binary | Utf8 | List | Map_, TNull = any> extends Builder<T, TNull> { | |
438 | protected _pendingLength = 0; | |
439 | protected _offsets: OffsetsBufferBuilder; | |
440 | protected _pending: Map<number, any> | undefined; | |
441 | constructor(opts: BuilderOptions<T, TNull>) { | |
442 | super(opts); | |
443 | this._offsets = new OffsetsBufferBuilder(); | |
444 | } | |
445 | public setValue(index: number, value: T['TValue']) { | |
446 | const pending = this._pending || (this._pending = new Map()); | |
447 | const current = pending.get(index); | |
448 | current && (this._pendingLength -= current.length); | |
449 | this._pendingLength += value.length; | |
450 | pending.set(index, value); | |
451 | } | |
452 | public setValid(index: number, isValid: boolean) { | |
453 | if (!super.setValid(index, isValid)) { | |
454 | (this._pending || (this._pending = new Map())).set(index, undefined); | |
455 | return false; | |
456 | } | |
457 | return true; | |
458 | } | |
459 | public clear() { | |
460 | this._pendingLength = 0; | |
461 | this._pending = undefined; | |
462 | return super.clear(); | |
463 | } | |
464 | public flush() { | |
465 | this._flush(); | |
466 | return super.flush(); | |
467 | } | |
468 | public finish() { | |
469 | this._flush(); | |
470 | return super.finish(); | |
471 | } | |
472 | protected _flush() { | |
473 | const pending = this._pending; | |
474 | const pendingLength = this._pendingLength; | |
475 | this._pendingLength = 0; | |
476 | this._pending = undefined; | |
477 | if (pending && pending.size > 0) { | |
478 | this._flushPending(pending, pendingLength); | |
479 | } | |
480 | return this; | |
481 | } | |
482 | protected abstract _flushPending(pending: Map<number, any>, pendingLength: number): void; | |
483 | } | |
484 | ||
485 | /** @ignore */ | |
486 | type ThroughIterable<T extends DataType = any, TNull = any> = (source: Iterable<T['TValue'] | TNull>) => IterableIterator<V<T>>; | |
487 | ||
488 | /** @ignore */ | |
489 | function throughIterable<T extends DataType = any, TNull = any>(options: IterableBuilderOptions<T, TNull>) { | |
490 | const { ['queueingStrategy']: queueingStrategy = 'count' } = options; | |
491 | const { ['highWaterMark']: highWaterMark = queueingStrategy !== 'bytes' ? 1000 : 2 ** 14 } = options; | |
492 | const sizeProperty: 'length' | 'byteLength' = queueingStrategy !== 'bytes' ? 'length' : 'byteLength'; | |
493 | return function*(source: Iterable<T['TValue'] | TNull>) { | |
494 | let numChunks = 0; | |
495 | const builder = Builder.new(options); | |
496 | for (const value of source) { | |
497 | if (builder.append(value)[sizeProperty] >= highWaterMark) { | |
498 | ++numChunks && (yield builder.toVector()); | |
499 | } | |
500 | } | |
501 | if (builder.finish().length > 0 || numChunks === 0) { | |
502 | yield builder.toVector(); | |
503 | } | |
504 | } as ThroughIterable<T, TNull>; | |
505 | } | |
506 | ||
507 | /** @ignore */ | |
508 | type ThroughAsyncIterable<T extends DataType = any, TNull = any> = (source: Iterable<T['TValue'] | TNull> | AsyncIterable<T['TValue'] | TNull>) => AsyncIterableIterator<V<T>>; | |
509 | ||
510 | /** @ignore */ | |
511 | function throughAsyncIterable<T extends DataType = any, TNull = any>(options: IterableBuilderOptions<T, TNull>) { | |
512 | const { ['queueingStrategy']: queueingStrategy = 'count' } = options; | |
513 | const { ['highWaterMark']: highWaterMark = queueingStrategy !== 'bytes' ? 1000 : 2 ** 14 } = options; | |
514 | const sizeProperty: 'length' | 'byteLength' = queueingStrategy !== 'bytes' ? 'length' : 'byteLength'; | |
515 | return async function* (source: Iterable<T['TValue'] | TNull> | AsyncIterable<T['TValue'] | TNull>) { | |
516 | let numChunks = 0; | |
517 | const builder = Builder.new(options); | |
518 | for await (const value of source) { | |
519 | if (builder.append(value)[sizeProperty] >= highWaterMark) { | |
520 | ++numChunks && (yield builder.toVector()); | |
521 | } | |
522 | } | |
523 | if (builder.finish().length > 0 || numChunks === 0) { | |
524 | yield builder.toVector(); | |
525 | } | |
526 | } as ThroughAsyncIterable<T, TNull>; | |
527 | } |