2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.dataset
.jni
;
20 import java
.util
.List
;
21 import java
.util
.concurrent
.atomic
.AtomicLong
;
23 import org
.apache
.arrow
.dataset
.ParquetWriteSupport
;
24 import org
.apache
.arrow
.dataset
.TestDataset
;
25 import org
.apache
.arrow
.dataset
.file
.FileFormat
;
26 import org
.apache
.arrow
.dataset
.file
.FileSystemDatasetFactory
;
27 import org
.apache
.arrow
.dataset
.scanner
.ScanOptions
;
28 import org
.apache
.arrow
.util
.AutoCloseables
;
29 import org
.apache
.arrow
.vector
.ipc
.message
.ArrowRecordBatch
;
30 import org
.junit
.Assert
;
31 import org
.junit
.ClassRule
;
32 import org
.junit
.Test
;
33 import org
.junit
.rules
.TemporaryFolder
;
35 public class TestReservationListener
extends TestDataset
{
38 public static final TemporaryFolder TMP
= new TemporaryFolder();
40 public static final String AVRO_SCHEMA_USER
= "user.avsc";
43 public void testDirectReservationListener() throws Exception
{
44 ParquetWriteSupport writeSupport
= ParquetWriteSupport
.writeTempFile(AVRO_SCHEMA_USER
, TMP
.newFolder(), 1, "a");
45 NativeMemoryPool pool
= NativeMemoryPool
.createListenable(DirectReservationListener
.instance());
46 FileSystemDatasetFactory factory
= new FileSystemDatasetFactory(rootAllocator(),
47 pool
, FileFormat
.PARQUET
,
48 writeSupport
.getOutputURI());
49 ScanOptions options
= new ScanOptions(100);
50 long initReservation
= DirectReservationListener
.instance().getCurrentDirectMemReservation();
51 List
<ArrowRecordBatch
> datum
= collectResultFromFactory(factory
, options
);
52 long reservation
= DirectReservationListener
.instance().getCurrentDirectMemReservation();
53 AutoCloseables
.close(datum
);
54 AutoCloseables
.close(pool
);
55 long finalReservation
= DirectReservationListener
.instance().getCurrentDirectMemReservation();
56 Assert
.assertTrue(reservation
>= initReservation
);
57 Assert
.assertTrue(finalReservation
== initReservation
);
61 public void testCustomReservationListener() throws Exception
{
62 ParquetWriteSupport writeSupport
= ParquetWriteSupport
.writeTempFile(AVRO_SCHEMA_USER
, TMP
.newFolder(), 1, "a");
63 final AtomicLong reserved
= new AtomicLong(0L);
64 ReservationListener listener
= new ReservationListener() {
66 public void reserve(long size
) {
67 reserved
.getAndAdd(size
);
71 public void unreserve(long size
) {
72 reserved
.getAndAdd(-size
);
75 NativeMemoryPool pool
= NativeMemoryPool
.createListenable(listener
);
76 FileSystemDatasetFactory factory
= new FileSystemDatasetFactory(rootAllocator(),
77 pool
, FileFormat
.PARQUET
, writeSupport
.getOutputURI());
78 ScanOptions options
= new ScanOptions(100);
79 long initReservation
= reserved
.get();
80 List
<ArrowRecordBatch
> datum
= collectResultFromFactory(factory
, options
);
81 long reservation
= reserved
.get();
82 AutoCloseables
.close(datum
);
83 AutoCloseables
.close(pool
);
84 long finalReservation
= reserved
.get();
85 Assert
.assertTrue(reservation
>= initReservation
);
86 Assert
.assertTrue(finalReservation
== initReservation
);