]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/python/scripts/test_leak.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / scripts / test_leak.py
diff --git a/ceph/src/arrow/python/scripts/test_leak.py b/ceph/src/arrow/python/scripts/test_leak.py
new file mode 100644 (file)
index 0000000..f2bbe8d
--- /dev/null
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow as pa
+import numpy as np
+import pandas as pd
+from pyarrow.tests.util import rands
+import memory_profiler
+import gc
+import io
+
+MEGABYTE = 1 << 20
+
+
+def assert_does_not_leak(f, iterations=10, check_interval=1, tolerance=5):
+    gc.collect()
+    baseline = memory_profiler.memory_usage()[0]
+    for i in range(iterations):
+        f()
+        if i % check_interval == 0:
+            gc.collect()
+            usage = memory_profiler.memory_usage()[0]
+            diff = usage - baseline
+            print("{0}: {1}\r".format(i, diff), end="")
+            if diff > tolerance:
+                raise Exception("Memory increased by {0} megabytes after {1} "
+                                "iterations".format(diff, i + 1))
+    gc.collect()
+    usage = memory_profiler.memory_usage()[0]
+    diff = usage - baseline
+    print("\nMemory increased by {0} megabytes after {1} "
+          "iterations".format(diff, iterations))
+
+
+def test_leak1():
+    data = [pa.array(np.concatenate([np.random.randn(100000)] * 1000))]
+    table = pa.Table.from_arrays(data, ['foo'])
+
+    def func():
+        table.to_pandas()
+    assert_does_not_leak(func)
+
+
+def test_leak2():
+    data = [pa.array(np.concatenate([np.random.randn(100000)] * 10))]
+    table = pa.Table.from_arrays(data, ['foo'])
+
+    def func():
+        df = table.to_pandas()
+
+        batch = pa.RecordBatch.from_pandas(df)
+
+        sink = io.BytesIO()
+        writer = pa.RecordBatchFileWriter(sink, batch.schema)
+        writer.write_batch(batch)
+        writer.close()
+
+        buf_reader = pa.BufferReader(sink.getvalue())
+        reader = pa.open_file(buf_reader)
+        reader.read_all()
+
+    assert_does_not_leak(func, iterations=50, tolerance=50)
+
+
+def test_leak3():
+    import pyarrow.parquet as pq
+
+    df = pd.DataFrame({'a{0}'.format(i): [1, 2, 3, 4]
+                       for i in range(50)})
+    table = pa.Table.from_pandas(df, preserve_index=False)
+
+    writer = pq.ParquetWriter('leak_test_' + rands(5) + '.parquet',
+                              table.schema)
+
+    def func():
+        writer.write_table(table, row_group_size=len(table))
+
+    # This does not "leak" per se but we do want to have this use as little
+    # memory as possible
+    assert_does_not_leak(func, iterations=500,
+                         check_interval=50, tolerance=20)
+
+
+def test_ARROW_8801():
+    x = pd.to_datetime(np.random.randint(0, 2**32, size=2**20),
+                       unit='ms', utc=True)
+    table = pa.table(pd.DataFrame({'x': x}))
+
+    assert_does_not_leak(lambda: table.to_pandas(split_blocks=False),
+                         iterations=1000, check_interval=50, tolerance=1000)
+
+
+if __name__ == '__main__':
+    test_ARROW_8801()