]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/benchmarks/streaming.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / benchmarks / streaming.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 import numpy as np
19 import pandas as pd
20 import pyarrow as pa
21
22 from . import common
23 from .common import KILOBYTE, MEGABYTE
24
25
26 def generate_chunks(total_size, nchunks, ncols, dtype=np.dtype('int64')):
27 rowsize = total_size // nchunks // ncols
28 assert rowsize % dtype.itemsize == 0
29
30 def make_column(col, chunk):
31 return np.frombuffer(common.get_random_bytes(
32 rowsize, seed=col + 997 * chunk)).view(dtype)
33
34 return [pd.DataFrame({
35 'c' + str(col): make_column(col, chunk)
36 for col in range(ncols)})
37 for chunk in range(nchunks)]
38
39
40 class StreamReader(object):
41 """
42 Benchmark in-memory streaming to a Pandas dataframe.
43 """
44 total_size = 64 * MEGABYTE
45 ncols = 8
46 chunk_sizes = [16 * KILOBYTE, 256 * KILOBYTE, 8 * MEGABYTE]
47
48 param_names = ['chunk_size']
49 params = [chunk_sizes]
50
51 def setup(self, chunk_size):
52 # Note we're careful to stream different chunks instead of
53 # streaming N times the same chunk, so that we avoid operating
54 # entirely out of L1/L2.
55 chunks = generate_chunks(self.total_size,
56 nchunks=self.total_size // chunk_size,
57 ncols=self.ncols)
58 batches = [pa.RecordBatch.from_pandas(df)
59 for df in chunks]
60 schema = batches[0].schema
61 sink = pa.BufferOutputStream()
62 stream_writer = pa.RecordBatchStreamWriter(sink, schema)
63 for batch in batches:
64 stream_writer.write_batch(batch)
65 self.source = sink.getvalue()
66
67 def time_read_to_dataframe(self, *args):
68 reader = pa.RecordBatchStreamReader(self.source)
69 table = reader.read_all()
70 df = table.to_pandas() # noqa