]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/examples/plasma/sorting/sort_df.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
18 from multiprocessing
import Pool
22 import pyarrow
.plasma
as plasma
28 # To run this example, you will first need to run "python setup.py install" in
29 # this directory to build the Cython module.
31 # You will only see speedups if you run this code on more data, this is just a
32 # small example that can run on a laptop.
34 # The values we used to get a speedup (on a m4.10xlarge instance on EC2) were
35 # object_store_size = 84 * 10 ** 9
41 object_store_size
= 2 * 10 ** 9 # 2 GB
45 column_names
= [str(i
) for i
in range(num_cols
)]
46 column_to_sort
= column_names
[0]
52 client
= plasma
.connect('/tmp/store')
53 np
.random
.seed(int(time
.time() * 10e7
) % 10000000)
57 record_batch
= pa
.RecordBatch
.from_pandas(df
)
59 # Get size of record batch and schema
60 mock_sink
= pa
.MockOutputStream()
61 stream_writer
= pa
.RecordBatchStreamWriter(mock_sink
, record_batch
.schema
)
62 stream_writer
.write_batch(record_batch
)
63 data_size
= mock_sink
.size()
65 # Generate an ID and allocate a buffer in the object store for the
66 # serialized DataFrame
67 object_id
= plasma
.ObjectID(np
.random
.bytes(20))
68 buf
= client
.create(object_id
, data_size
)
70 # Write the serialized DataFrame to the object store
71 sink
= pa
.FixedSizeBufferWriter(buf
)
72 stream_writer
= pa
.RecordBatchStreamWriter(sink
, record_batch
.schema
)
73 stream_writer
.write_batch(record_batch
)
76 client
.seal(object_id
)
81 def get_dfs(object_ids
):
82 """Retrieve dataframes from the object store given their object IDs."""
83 buffers
= client
.get_buffers(object_ids
)
84 return [pa
.RecordBatchStreamReader(buf
).read_next_batch().to_pandas()
88 def local_sort(object_id
):
89 """Sort a partition of a dataframe."""
90 # Get the dataframe from the object store.
91 [df
] = get_dfs([object_id
])
93 sorted_df
= df
.sort_values(by
=column_to_sort
)
94 # Get evenly spaced values from the dataframe.
95 indices
= np
.linspace(0, len(df
) - 1, num
=num_cores
, dtype
=np
.int64
)
96 # Put the sorted dataframe in the object store and return the corresponding
97 # object ID as well as the sampled values.
98 return put_df(sorted_df
), sorted_df
.as_matrix().take(indices
)
101 def local_partitions(object_id_and_pivots
):
102 """Take a sorted partition of a dataframe and split it into more pieces."""
103 object_id
, pivots
= object_id_and_pivots
104 [df
] = get_dfs([object_id
])
105 split_at
= df
[column_to_sort
].searchsorted(pivots
)
106 split_at
= [0] + list(split_at
) + [len(df
)]
107 # Partition the sorted dataframe and put each partition into the object
109 return [put_df(df
[i
:j
]) for i
, j
in zip(split_at
[:-1], split_at
[1:])]
112 def merge(object_ids
):
113 """Merge a number of sorted dataframes into a single sorted dataframe."""
114 dfs
= get_dfs(object_ids
)
116 # In order to use our multimerge code, we have to convert the arrays from
117 # the Fortran format to the C format.
118 arrays
= [np
.ascontiguousarray(df
.as_matrix()) for df
in dfs
]
120 assert a
.dtype
== np
.float64
121 assert not np
.isfortran(a
)
123 # Filter out empty arrays.
124 arrays
= [a
for a
in arrays
if a
.shape
[0] > 0]
129 resulting_array
= multimerge
.multimerge2d(*arrays
)
130 merged_df2
= pd
.DataFrame(resulting_array
, columns
=column_names
)
132 return put_df(merged_df2
)
135 if __name__
== '__main__':
136 # Start the plasma store.
137 p
= subprocess
.Popen(['plasma_store',
139 '-m', str(object_store_size
)])
141 # Connect to the plasma store.
144 # Connect the processes in the pool.
145 pool
= Pool(initializer
=connect
, initargs
=(), processes
=num_cores
)
147 # Create a DataFrame from a numpy array.
148 df
= pd
.DataFrame(np
.random
.randn(num_rows
, num_cols
),
149 columns
=column_names
)
151 partition_ids
= [put_df(partition
) for partition
152 in np
.split(df
, num_cores
)]
154 # Begin timing the parallel sort example.
155 parallel_sort_start
= time
.time()
157 # Sort each partition and subsample them. The subsampled values will be
158 # used to create buckets.
159 sorted_df_ids
, pivot_groups
= list(zip(*pool
.map(local_sort
,
163 all_pivots
= np
.concatenate(pivot_groups
)
164 indices
= np
.linspace(0, len(all_pivots
) - 1, num
=num_cores
,
166 pivots
= np
.take(np
.sort(all_pivots
), indices
)
168 # Break all of the sorted partitions into even smaller partitions. Group
169 # the object IDs from each bucket together.
170 results
= list(zip(*pool
.map(local_partitions
,
172 len(sorted_df_ids
) * [pivots
]))))
174 # Merge each of the buckets and store the results in the object store.
175 object_ids
= pool
.map(merge
, results
)
177 resulting_ids
= [object_id
for object_id
in object_ids
178 if object_id
is not None]
180 # Stop timing the paralle sort example.
181 parallel_sort_end
= time
.time()
183 print('Parallel sort took {} seconds.'
184 .format(parallel_sort_end
- parallel_sort_start
))
186 serial_sort_start
= time
.time()
188 original_sorted_df
= df
.sort_values(by
=column_to_sort
)
190 serial_sort_end
= time
.time()
192 # Check that we sorted the DataFrame properly.
194 sorted_dfs
= get_dfs(resulting_ids
)
195 sorted_df
= pd
.concat(sorted_dfs
)
197 print('Serial sort took {} seconds.'
198 .format(serial_sort_end
- serial_sort_start
))
200 assert np
.allclose(sorted_df
.values
, original_sorted_df
.values
)
202 # Kill the object store.