]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/benchmarks/streaming.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
23 from .common
import KILOBYTE
, MEGABYTE
26 def generate_chunks(total_size
, nchunks
, ncols
, dtype
=np
.dtype('int64')):
27 rowsize
= total_size
// nchunks
// ncols
28 assert rowsize
% dtype
.itemsize
== 0
30 def make_column(col
, chunk
):
31 return np
.frombuffer(common
.get_random_bytes(
32 rowsize
, seed
=col
+ 997 * chunk
)).view(dtype
)
34 return [pd
.DataFrame({
35 'c' + str(col
): make_column(col
, chunk
)
36 for col
in range(ncols
)})
37 for chunk
in range(nchunks
)]
40 class StreamReader(object):
42 Benchmark in-memory streaming to a Pandas dataframe.
44 total_size
= 64 * MEGABYTE
46 chunk_sizes
= [16 * KILOBYTE
, 256 * KILOBYTE
, 8 * MEGABYTE
]
48 param_names
= ['chunk_size']
49 params
= [chunk_sizes
]
51 def setup(self
, chunk_size
):
52 # Note we're careful to stream different chunks instead of
53 # streaming N times the same chunk, so that we avoid operating
54 # entirely out of L1/L2.
55 chunks
= generate_chunks(self
.total_size
,
56 nchunks
=self
.total_size
// chunk_size
,
58 batches
= [pa
.RecordBatch
.from_pandas(df
)
60 schema
= batches
[0].schema
61 sink
= pa
.BufferOutputStream()
62 stream_writer
= pa
.RecordBatchStreamWriter(sink
, schema
)
64 stream_writer
.write_batch(batch
)
65 self
.source
= sink
.getvalue()
67 def time_read_to_dataframe(self
, *args
):
68 reader
= pa
.RecordBatchStreamReader(self
.source
)
69 table
= reader
.read_all()
70 df
= table
.to_pandas() # noqa