]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/benchmarks/io.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
22 class HighLatencyReader(object):
24 def __init__(self
, raw
, latency
):
26 self
.latency
= latency
33 return self
.raw
.closed
35 def read(self
, nbytes
=None):
36 time
.sleep(self
.latency
)
37 return self
.raw
.read(nbytes
)
40 class HighLatencyWriter(object):
42 def __init__(self
, raw
, latency
):
44 self
.latency
= latency
51 return self
.raw
.closed
53 def write(self
, data
):
54 time
.sleep(self
.latency
)
58 class BufferedIOHighLatency(object):
59 """Benchmark creating a parquet manifest."""
62 total_size
= 16 * (1 << 20) # 16 MB
63 buffer_size
= 1 << 20 # 1 MB
66 param_names
= ('latency',)
67 params
= [0, 0.01, 0.1]
69 def time_buffered_writes(self
, latency
):
70 test_data
= b
'x' * self
.increment
72 out
= pa
.BufferOutputStream()
73 slow_out
= HighLatencyWriter(out
, latency
)
74 buffered_out
= pa
.output_stream(slow_out
, buffer_size
=self
.buffer_size
)
76 while bytes_written
< self
.total_size
:
77 buffered_out
.write(test_data
)
78 bytes_written
+= self
.increment
81 def time_buffered_reads(self
, latency
):
83 reader
= pa
.input_stream(pa
.py_buffer(b
'x' * self
.total_size
))
84 slow_reader
= HighLatencyReader(reader
, latency
)
85 buffered_reader
= pa
.input_stream(slow_reader
,
86 buffer_size
=self
.buffer_size
)
87 while bytes_read
< self
.total_size
:
88 buffered_reader
.read(self
.increment
)
89 bytes_read
+= self
.increment