]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | # Licensed to the Apache Software Foundation (ASF) under one |
2 | # or more contributor license agreements. See the NOTICE file | |
3 | # distributed with this work for additional information | |
4 | # regarding copyright ownership. The ASF licenses this file | |
5 | # to you under the Apache License, Version 2.0 (the | |
6 | # "License"); you may not use this file except in compliance | |
7 | # with the License. You may obtain a copy of the License at | |
8 | # | |
9 | # http://www.apache.org/licenses/LICENSE-2.0 | |
10 | # | |
11 | # Unless required by applicable law or agreed to in writing, | |
12 | # software distributed under the License is distributed on an | |
13 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | # KIND, either express or implied. See the License for the | |
15 | # specific language governing permissions and limitations | |
16 | # under the License. | |
17 | ||
18 | from multiprocessing import Pool | |
19 | import numpy as np | |
20 | import pandas as pd | |
21 | import pyarrow as pa | |
22 | import pyarrow.plasma as plasma | |
23 | import subprocess | |
24 | import time | |
25 | ||
26 | import multimerge | |
27 | ||
28 | # To run this example, you will first need to run "python setup.py install" in | |
29 | # this directory to build the Cython module. | |
30 | # | |
31 | # You will only see speedups if you run this code on more data, this is just a | |
32 | # small example that can run on a laptop. | |
33 | # | |
34 | # The values we used to get a speedup (on a m4.10xlarge instance on EC2) were | |
35 | # object_store_size = 84 * 10 ** 9 | |
36 | # num_cores = 20 | |
37 | # num_rows = 10 ** 9 | |
38 | # num_cols = 1 | |
39 | ||
40 | client = None | |
41 | object_store_size = 2 * 10 ** 9 # 2 GB | |
42 | num_cores = 8 | |
43 | num_rows = 200000 | |
44 | num_cols = 2 | |
45 | column_names = [str(i) for i in range(num_cols)] | |
46 | column_to_sort = column_names[0] | |
47 | ||
48 | ||
49 | # Connect to clients | |
50 | def connect(): | |
51 | global client | |
52 | client = plasma.connect('/tmp/store') | |
53 | np.random.seed(int(time.time() * 10e7) % 10000000) | |
54 | ||
55 | ||
56 | def put_df(df): | |
57 | record_batch = pa.RecordBatch.from_pandas(df) | |
58 | ||
59 | # Get size of record batch and schema | |
60 | mock_sink = pa.MockOutputStream() | |
61 | stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) | |
62 | stream_writer.write_batch(record_batch) | |
63 | data_size = mock_sink.size() | |
64 | ||
65 | # Generate an ID and allocate a buffer in the object store for the | |
66 | # serialized DataFrame | |
67 | object_id = plasma.ObjectID(np.random.bytes(20)) | |
68 | buf = client.create(object_id, data_size) | |
69 | ||
70 | # Write the serialized DataFrame to the object store | |
71 | sink = pa.FixedSizeBufferWriter(buf) | |
72 | stream_writer = pa.RecordBatchStreamWriter(sink, record_batch.schema) | |
73 | stream_writer.write_batch(record_batch) | |
74 | ||
75 | # Seal the object | |
76 | client.seal(object_id) | |
77 | ||
78 | return object_id | |
79 | ||
80 | ||
81 | def get_dfs(object_ids): | |
82 | """Retrieve dataframes from the object store given their object IDs.""" | |
83 | buffers = client.get_buffers(object_ids) | |
84 | return [pa.RecordBatchStreamReader(buf).read_next_batch().to_pandas() | |
85 | for buf in buffers] | |
86 | ||
87 | ||
88 | def local_sort(object_id): | |
89 | """Sort a partition of a dataframe.""" | |
90 | # Get the dataframe from the object store. | |
91 | [df] = get_dfs([object_id]) | |
92 | # Sort the dataframe. | |
93 | sorted_df = df.sort_values(by=column_to_sort) | |
94 | # Get evenly spaced values from the dataframe. | |
95 | indices = np.linspace(0, len(df) - 1, num=num_cores, dtype=np.int64) | |
96 | # Put the sorted dataframe in the object store and return the corresponding | |
97 | # object ID as well as the sampled values. | |
98 | return put_df(sorted_df), sorted_df.as_matrix().take(indices) | |
99 | ||
100 | ||
101 | def local_partitions(object_id_and_pivots): | |
102 | """Take a sorted partition of a dataframe and split it into more pieces.""" | |
103 | object_id, pivots = object_id_and_pivots | |
104 | [df] = get_dfs([object_id]) | |
105 | split_at = df[column_to_sort].searchsorted(pivots) | |
106 | split_at = [0] + list(split_at) + [len(df)] | |
107 | # Partition the sorted dataframe and put each partition into the object | |
108 | # store. | |
109 | return [put_df(df[i:j]) for i, j in zip(split_at[:-1], split_at[1:])] | |
110 | ||
111 | ||
112 | def merge(object_ids): | |
113 | """Merge a number of sorted dataframes into a single sorted dataframe.""" | |
114 | dfs = get_dfs(object_ids) | |
115 | ||
116 | # In order to use our multimerge code, we have to convert the arrays from | |
117 | # the Fortran format to the C format. | |
118 | arrays = [np.ascontiguousarray(df.as_matrix()) for df in dfs] | |
119 | for a in arrays: | |
120 | assert a.dtype == np.float64 | |
121 | assert not np.isfortran(a) | |
122 | ||
123 | # Filter out empty arrays. | |
124 | arrays = [a for a in arrays if a.shape[0] > 0] | |
125 | ||
126 | if len(arrays) == 0: | |
127 | return None | |
128 | ||
129 | resulting_array = multimerge.multimerge2d(*arrays) | |
130 | merged_df2 = pd.DataFrame(resulting_array, columns=column_names) | |
131 | ||
132 | return put_df(merged_df2) | |
133 | ||
134 | ||
135 | if __name__ == '__main__': | |
136 | # Start the plasma store. | |
137 | p = subprocess.Popen(['plasma_store', | |
138 | '-s', '/tmp/store', | |
139 | '-m', str(object_store_size)]) | |
140 | ||
141 | # Connect to the plasma store. | |
142 | connect() | |
143 | ||
144 | # Connect the processes in the pool. | |
145 | pool = Pool(initializer=connect, initargs=(), processes=num_cores) | |
146 | ||
147 | # Create a DataFrame from a numpy array. | |
148 | df = pd.DataFrame(np.random.randn(num_rows, num_cols), | |
149 | columns=column_names) | |
150 | ||
151 | partition_ids = [put_df(partition) for partition | |
152 | in np.split(df, num_cores)] | |
153 | ||
154 | # Begin timing the parallel sort example. | |
155 | parallel_sort_start = time.time() | |
156 | ||
157 | # Sort each partition and subsample them. The subsampled values will be | |
158 | # used to create buckets. | |
159 | sorted_df_ids, pivot_groups = list(zip(*pool.map(local_sort, | |
160 | partition_ids))) | |
161 | ||
162 | # Choose the pivots. | |
163 | all_pivots = np.concatenate(pivot_groups) | |
164 | indices = np.linspace(0, len(all_pivots) - 1, num=num_cores, | |
165 | dtype=np.int64) | |
166 | pivots = np.take(np.sort(all_pivots), indices) | |
167 | ||
168 | # Break all of the sorted partitions into even smaller partitions. Group | |
169 | # the object IDs from each bucket together. | |
170 | results = list(zip(*pool.map(local_partitions, | |
171 | zip(sorted_df_ids, | |
172 | len(sorted_df_ids) * [pivots])))) | |
173 | ||
174 | # Merge each of the buckets and store the results in the object store. | |
175 | object_ids = pool.map(merge, results) | |
176 | ||
177 | resulting_ids = [object_id for object_id in object_ids | |
178 | if object_id is not None] | |
179 | ||
180 | # Stop timing the paralle sort example. | |
181 | parallel_sort_end = time.time() | |
182 | ||
183 | print('Parallel sort took {} seconds.' | |
184 | .format(parallel_sort_end - parallel_sort_start)) | |
185 | ||
186 | serial_sort_start = time.time() | |
187 | ||
188 | original_sorted_df = df.sort_values(by=column_to_sort) | |
189 | ||
190 | serial_sort_end = time.time() | |
191 | ||
192 | # Check that we sorted the DataFrame properly. | |
193 | ||
194 | sorted_dfs = get_dfs(resulting_ids) | |
195 | sorted_df = pd.concat(sorted_dfs) | |
196 | ||
197 | print('Serial sort took {} seconds.' | |
198 | .format(serial_sort_end - serial_sort_start)) | |
199 | ||
200 | assert np.allclose(sorted_df.values, original_sorted_df.values) | |
201 | ||
202 | # Kill the object store. | |
203 | p.kill() |