]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/examples/plasma/sorting/sort_df.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / examples / plasma / sorting / sort_df.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 from multiprocessing import Pool
19 import numpy as np
20 import pandas as pd
21 import pyarrow as pa
22 import pyarrow.plasma as plasma
23 import subprocess
24 import time
25
26 import multimerge
27
28 # To run this example, you will first need to run "python setup.py install" in
29 # this directory to build the Cython module.
30 #
31 # You will only see speedups if you run this code on more data, this is just a
32 # small example that can run on a laptop.
33 #
34 # The values we used to get a speedup (on a m4.10xlarge instance on EC2) were
35 # object_store_size = 84 * 10 ** 9
36 # num_cores = 20
37 # num_rows = 10 ** 9
38 # num_cols = 1
39
40 client = None
41 object_store_size = 2 * 10 ** 9 # 2 GB
42 num_cores = 8
43 num_rows = 200000
44 num_cols = 2
45 column_names = [str(i) for i in range(num_cols)]
46 column_to_sort = column_names[0]
47
48
49 # Connect to clients
50 def connect():
51 global client
52 client = plasma.connect('/tmp/store')
53 np.random.seed(int(time.time() * 10e7) % 10000000)
54
55
56 def put_df(df):
57 record_batch = pa.RecordBatch.from_pandas(df)
58
59 # Get size of record batch and schema
60 mock_sink = pa.MockOutputStream()
61 stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
62 stream_writer.write_batch(record_batch)
63 data_size = mock_sink.size()
64
65 # Generate an ID and allocate a buffer in the object store for the
66 # serialized DataFrame
67 object_id = plasma.ObjectID(np.random.bytes(20))
68 buf = client.create(object_id, data_size)
69
70 # Write the serialized DataFrame to the object store
71 sink = pa.FixedSizeBufferWriter(buf)
72 stream_writer = pa.RecordBatchStreamWriter(sink, record_batch.schema)
73 stream_writer.write_batch(record_batch)
74
75 # Seal the object
76 client.seal(object_id)
77
78 return object_id
79
80
81 def get_dfs(object_ids):
82 """Retrieve dataframes from the object store given their object IDs."""
83 buffers = client.get_buffers(object_ids)
84 return [pa.RecordBatchStreamReader(buf).read_next_batch().to_pandas()
85 for buf in buffers]
86
87
88 def local_sort(object_id):
89 """Sort a partition of a dataframe."""
90 # Get the dataframe from the object store.
91 [df] = get_dfs([object_id])
92 # Sort the dataframe.
93 sorted_df = df.sort_values(by=column_to_sort)
94 # Get evenly spaced values from the dataframe.
95 indices = np.linspace(0, len(df) - 1, num=num_cores, dtype=np.int64)
96 # Put the sorted dataframe in the object store and return the corresponding
97 # object ID as well as the sampled values.
98 return put_df(sorted_df), sorted_df.as_matrix().take(indices)
99
100
101 def local_partitions(object_id_and_pivots):
102 """Take a sorted partition of a dataframe and split it into more pieces."""
103 object_id, pivots = object_id_and_pivots
104 [df] = get_dfs([object_id])
105 split_at = df[column_to_sort].searchsorted(pivots)
106 split_at = [0] + list(split_at) + [len(df)]
107 # Partition the sorted dataframe and put each partition into the object
108 # store.
109 return [put_df(df[i:j]) for i, j in zip(split_at[:-1], split_at[1:])]
110
111
112 def merge(object_ids):
113 """Merge a number of sorted dataframes into a single sorted dataframe."""
114 dfs = get_dfs(object_ids)
115
116 # In order to use our multimerge code, we have to convert the arrays from
117 # the Fortran format to the C format.
118 arrays = [np.ascontiguousarray(df.as_matrix()) for df in dfs]
119 for a in arrays:
120 assert a.dtype == np.float64
121 assert not np.isfortran(a)
122
123 # Filter out empty arrays.
124 arrays = [a for a in arrays if a.shape[0] > 0]
125
126 if len(arrays) == 0:
127 return None
128
129 resulting_array = multimerge.multimerge2d(*arrays)
130 merged_df2 = pd.DataFrame(resulting_array, columns=column_names)
131
132 return put_df(merged_df2)
133
134
135 if __name__ == '__main__':
136 # Start the plasma store.
137 p = subprocess.Popen(['plasma_store',
138 '-s', '/tmp/store',
139 '-m', str(object_store_size)])
140
141 # Connect to the plasma store.
142 connect()
143
144 # Connect the processes in the pool.
145 pool = Pool(initializer=connect, initargs=(), processes=num_cores)
146
147 # Create a DataFrame from a numpy array.
148 df = pd.DataFrame(np.random.randn(num_rows, num_cols),
149 columns=column_names)
150
151 partition_ids = [put_df(partition) for partition
152 in np.split(df, num_cores)]
153
154 # Begin timing the parallel sort example.
155 parallel_sort_start = time.time()
156
157 # Sort each partition and subsample them. The subsampled values will be
158 # used to create buckets.
159 sorted_df_ids, pivot_groups = list(zip(*pool.map(local_sort,
160 partition_ids)))
161
162 # Choose the pivots.
163 all_pivots = np.concatenate(pivot_groups)
164 indices = np.linspace(0, len(all_pivots) - 1, num=num_cores,
165 dtype=np.int64)
166 pivots = np.take(np.sort(all_pivots), indices)
167
168 # Break all of the sorted partitions into even smaller partitions. Group
169 # the object IDs from each bucket together.
170 results = list(zip(*pool.map(local_partitions,
171 zip(sorted_df_ids,
172 len(sorted_df_ids) * [pivots]))))
173
174 # Merge each of the buckets and store the results in the object store.
175 object_ids = pool.map(merge, results)
176
177 resulting_ids = [object_id for object_id in object_ids
178 if object_id is not None]
179
180 # Stop timing the paralle sort example.
181 parallel_sort_end = time.time()
182
183 print('Parallel sort took {} seconds.'
184 .format(parallel_sort_end - parallel_sort_start))
185
186 serial_sort_start = time.time()
187
188 original_sorted_df = df.sort_values(by=column_to_sort)
189
190 serial_sort_end = time.time()
191
192 # Check that we sorted the DataFrame properly.
193
194 sorted_dfs = get_dfs(resulting_ids)
195 sorted_df = pd.concat(sorted_dfs)
196
197 print('Serial sort took {} seconds.'
198 .format(serial_sort_end - serial_sort_start))
199
200 assert np.allclose(sorted_df.values, original_sorted_df.values)
201
202 # Kill the object store.
203 p.kill()