]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | # Licensed to the Apache Software Foundation (ASF) under one |
2 | # or more contributor license agreements. See the NOTICE file | |
3 | # distributed with this work for additional information | |
4 | # regarding copyright ownership. The ASF licenses this file | |
5 | # to you under the Apache License, Version 2.0 (the | |
6 | # "License"); you may not use this file except in compliance | |
7 | # with the License. You may obtain a copy of the License at | |
8 | # | |
9 | # http://www.apache.org/licenses/LICENSE-2.0 | |
10 | # | |
11 | # Unless required by applicable law or agreed to in writing, | |
12 | # software distributed under the License is distributed on an | |
13 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | # KIND, either express or implied. See the License for the | |
15 | # specific language governing permissions and limitations | |
16 | # under the License. | |
17 | ||
18 | ||
19 | import multiprocessing | |
20 | import os | |
21 | import pytest | |
22 | import random | |
23 | import signal | |
24 | import struct | |
25 | import subprocess | |
26 | import sys | |
27 | import time | |
28 | ||
29 | import numpy as np | |
30 | import pyarrow as pa | |
31 | ||
32 | ||
33 | DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8 | |
34 | USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1" | |
35 | EXTERNAL_STORE = "hashtable://test" | |
36 | SMALL_OBJECT_SIZE = 9000 | |
37 | ||
38 | ||
39 | def random_name(): | |
40 | return str(random.randint(0, 99999999)) | |
41 | ||
42 | ||
43 | def random_object_id(): | |
44 | import pyarrow.plasma as plasma | |
45 | return plasma.ObjectID(np.random.bytes(20)) | |
46 | ||
47 | ||
48 | def generate_metadata(length): | |
49 | metadata = bytearray(length) | |
50 | if length > 0: | |
51 | metadata[0] = random.randint(0, 255) | |
52 | metadata[-1] = random.randint(0, 255) | |
53 | for _ in range(100): | |
54 | metadata[random.randint(0, length - 1)] = random.randint(0, 255) | |
55 | return metadata | |
56 | ||
57 | ||
58 | def write_to_data_buffer(buff, length): | |
59 | array = np.frombuffer(buff, dtype="uint8") | |
60 | if length > 0: | |
61 | array[0] = random.randint(0, 255) | |
62 | array[-1] = random.randint(0, 255) | |
63 | for _ in range(100): | |
64 | array[random.randint(0, length - 1)] = random.randint(0, 255) | |
65 | ||
66 | ||
67 | def create_object_with_id(client, object_id, data_size, metadata_size, | |
68 | seal=True): | |
69 | metadata = generate_metadata(metadata_size) | |
70 | memory_buffer = client.create(object_id, data_size, metadata) | |
71 | write_to_data_buffer(memory_buffer, data_size) | |
72 | if seal: | |
73 | client.seal(object_id) | |
74 | return memory_buffer, metadata | |
75 | ||
76 | ||
77 | def create_object(client, data_size, metadata_size=0, seal=True): | |
78 | object_id = random_object_id() | |
79 | memory_buffer, metadata = create_object_with_id(client, object_id, | |
80 | data_size, metadata_size, | |
81 | seal=seal) | |
82 | return object_id, memory_buffer, metadata | |
83 | ||
84 | ||
85 | @pytest.mark.plasma | |
86 | class TestPlasmaClient: | |
87 | ||
88 | def setup_method(self, test_method): | |
89 | import pyarrow.plasma as plasma | |
90 | # Start Plasma store. | |
91 | self.plasma_store_ctx = plasma.start_plasma_store( | |
92 | plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, | |
93 | use_valgrind=USE_VALGRIND) | |
94 | self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() | |
95 | # Connect to Plasma. | |
96 | self.plasma_client = plasma.connect(self.plasma_store_name) | |
97 | self.plasma_client2 = plasma.connect(self.plasma_store_name) | |
98 | ||
99 | def teardown_method(self, test_method): | |
100 | try: | |
101 | # Check that the Plasma store is still alive. | |
102 | assert self.p.poll() is None | |
103 | # Ensure Valgrind and/or coverage have a clean exit | |
104 | # Valgrind misses SIGTERM if it is delivered before the | |
105 | # event loop is ready; this race condition is mitigated | |
106 | # but not solved by time.sleep(). | |
107 | if USE_VALGRIND: | |
108 | time.sleep(1.0) | |
109 | self.p.send_signal(signal.SIGTERM) | |
110 | self.p.wait(timeout=5) | |
111 | assert self.p.returncode == 0 | |
112 | finally: | |
113 | self.plasma_store_ctx.__exit__(None, None, None) | |
114 | ||
115 | def test_connection_failure_raises_exception(self): | |
116 | import pyarrow.plasma as plasma | |
117 | # ARROW-1264 | |
118 | with pytest.raises(IOError): | |
119 | plasma.connect('unknown-store-name', num_retries=1) | |
120 | ||
121 | def test_create(self): | |
122 | # Create an object id string. | |
123 | object_id = random_object_id() | |
124 | # Create a new buffer and write to it. | |
125 | length = 50 | |
126 | memory_buffer = np.frombuffer(self.plasma_client.create(object_id, | |
127 | length), | |
128 | dtype="uint8") | |
129 | for i in range(length): | |
130 | memory_buffer[i] = i % 256 | |
131 | # Seal the object. | |
132 | self.plasma_client.seal(object_id) | |
133 | # Get the object. | |
134 | memory_buffer = np.frombuffer( | |
135 | self.plasma_client.get_buffers([object_id])[0], dtype="uint8") | |
136 | for i in range(length): | |
137 | assert memory_buffer[i] == i % 256 | |
138 | ||
139 | def test_create_with_metadata(self): | |
140 | for length in range(0, 1000, 3): | |
141 | # Create an object id string. | |
142 | object_id = random_object_id() | |
143 | # Create a random metadata string. | |
144 | metadata = generate_metadata(length) | |
145 | # Create a new buffer and write to it. | |
146 | memory_buffer = np.frombuffer(self.plasma_client.create(object_id, | |
147 | length, | |
148 | metadata), | |
149 | dtype="uint8") | |
150 | for i in range(length): | |
151 | memory_buffer[i] = i % 256 | |
152 | # Seal the object. | |
153 | self.plasma_client.seal(object_id) | |
154 | # Get the object. | |
155 | memory_buffer = np.frombuffer( | |
156 | self.plasma_client.get_buffers([object_id])[0], dtype="uint8") | |
157 | for i in range(length): | |
158 | assert memory_buffer[i] == i % 256 | |
159 | # Get the metadata. | |
160 | metadata_buffer = np.frombuffer( | |
161 | self.plasma_client.get_metadata([object_id])[0], dtype="uint8") | |
162 | assert len(metadata) == len(metadata_buffer) | |
163 | for i in range(len(metadata)): | |
164 | assert metadata[i] == metadata_buffer[i] | |
165 | ||
166 | def test_create_existing(self): | |
167 | # This test is partially used to test the code path in which we create | |
168 | # an object with an ID that already exists | |
169 | length = 100 | |
170 | for _ in range(1000): | |
171 | object_id = random_object_id() | |
172 | self.plasma_client.create(object_id, length, | |
173 | generate_metadata(length)) | |
174 | try: | |
175 | self.plasma_client.create(object_id, length, | |
176 | generate_metadata(length)) | |
177 | # TODO(pcm): Introduce a more specific error type here. | |
178 | except pa.lib.ArrowException: | |
179 | pass | |
180 | else: | |
181 | assert False | |
182 | ||
183 | def test_create_and_seal(self): | |
184 | ||
185 | # Create a bunch of objects. | |
186 | object_ids = [] | |
187 | for i in range(1000): | |
188 | object_id = random_object_id() | |
189 | object_ids.append(object_id) | |
190 | self.plasma_client.create_and_seal(object_id, i * b'a', i * b'b') | |
191 | ||
192 | for i in range(1000): | |
193 | [data_tuple] = self.plasma_client.get_buffers([object_ids[i]], | |
194 | with_meta=True) | |
195 | assert data_tuple[1].to_pybytes() == i * b'a' | |
196 | assert (self.plasma_client.get_metadata( | |
197 | [object_ids[i]])[0].to_pybytes() == | |
198 | i * b'b') | |
199 | ||
200 | # Make sure that creating the same object twice raises an exception. | |
201 | object_id = random_object_id() | |
202 | self.plasma_client.create_and_seal(object_id, b'a', b'b') | |
203 | with pytest.raises(pa.plasma.PlasmaObjectExists): | |
204 | self.plasma_client.create_and_seal(object_id, b'a', b'b') | |
205 | ||
206 | # Make sure that these objects can be evicted. | |
207 | big_object = DEFAULT_PLASMA_STORE_MEMORY // 10 * b'a' | |
208 | object_ids = [] | |
209 | for _ in range(20): | |
210 | object_id = random_object_id() | |
211 | object_ids.append(object_id) | |
212 | self.plasma_client.create_and_seal(random_object_id(), big_object, | |
213 | big_object) | |
214 | for i in range(10): | |
215 | assert not self.plasma_client.contains(object_ids[i]) | |
216 | ||
217 | def test_get(self): | |
218 | num_object_ids = 60 | |
219 | # Test timing out of get with various timeouts. | |
220 | for timeout in [0, 10, 100, 1000]: | |
221 | object_ids = [random_object_id() for _ in range(num_object_ids)] | |
222 | results = self.plasma_client.get_buffers(object_ids, | |
223 | timeout_ms=timeout) | |
224 | assert results == num_object_ids * [None] | |
225 | ||
226 | data_buffers = [] | |
227 | metadata_buffers = [] | |
228 | for i in range(num_object_ids): | |
229 | if i % 2 == 0: | |
230 | data_buffer, metadata_buffer = create_object_with_id( | |
231 | self.plasma_client, object_ids[i], 2000, 2000) | |
232 | data_buffers.append(data_buffer) | |
233 | metadata_buffers.append(metadata_buffer) | |
234 | ||
235 | # Test timing out from some but not all get calls with various | |
236 | # timeouts. | |
237 | for timeout in [0, 10, 100, 1000]: | |
238 | data_results = self.plasma_client.get_buffers(object_ids, | |
239 | timeout_ms=timeout) | |
240 | # metadata_results = self.plasma_client.get_metadata( | |
241 | # object_ids, timeout_ms=timeout) | |
242 | for i in range(num_object_ids): | |
243 | if i % 2 == 0: | |
244 | array1 = np.frombuffer(data_buffers[i // 2], dtype="uint8") | |
245 | array2 = np.frombuffer(data_results[i], dtype="uint8") | |
246 | np.testing.assert_equal(array1, array2) | |
247 | # TODO(rkn): We should compare the metadata as well. But | |
248 | # currently the types are different (e.g., memoryview | |
249 | # versus bytearray). | |
250 | # assert plasma.buffers_equal( | |
251 | # metadata_buffers[i // 2], metadata_results[i]) | |
252 | else: | |
253 | assert results[i] is None | |
254 | ||
255 | # Test trying to get an object that was created by the same client but | |
256 | # not sealed. | |
257 | object_id = random_object_id() | |
258 | self.plasma_client.create(object_id, 10, b"metadata") | |
259 | assert self.plasma_client.get_buffers( | |
260 | [object_id], timeout_ms=0, with_meta=True)[0][1] is None | |
261 | assert self.plasma_client.get_buffers( | |
262 | [object_id], timeout_ms=1, with_meta=True)[0][1] is None | |
263 | self.plasma_client.seal(object_id) | |
264 | assert self.plasma_client.get_buffers( | |
265 | [object_id], timeout_ms=0, with_meta=True)[0][1] is not None | |
266 | ||
267 | def test_buffer_lifetime(self): | |
268 | # ARROW-2195 | |
269 | arr = pa.array([1, 12, 23, 3, 34], pa.int32()) | |
270 | batch = pa.RecordBatch.from_arrays([arr], ['field1']) | |
271 | ||
272 | # Serialize RecordBatch into Plasma store | |
273 | sink = pa.MockOutputStream() | |
274 | writer = pa.RecordBatchStreamWriter(sink, batch.schema) | |
275 | writer.write_batch(batch) | |
276 | writer.close() | |
277 | ||
278 | object_id = random_object_id() | |
279 | data_buffer = self.plasma_client.create(object_id, sink.size()) | |
280 | stream = pa.FixedSizeBufferWriter(data_buffer) | |
281 | writer = pa.RecordBatchStreamWriter(stream, batch.schema) | |
282 | writer.write_batch(batch) | |
283 | writer.close() | |
284 | self.plasma_client.seal(object_id) | |
285 | del data_buffer | |
286 | ||
287 | # Unserialize RecordBatch from Plasma store | |
288 | [data_buffer] = self.plasma_client2.get_buffers([object_id]) | |
289 | reader = pa.RecordBatchStreamReader(data_buffer) | |
290 | read_batch = reader.read_next_batch() | |
291 | # Lose reference to returned buffer. The RecordBatch must still | |
292 | # be backed by valid memory. | |
293 | del data_buffer, reader | |
294 | ||
295 | assert read_batch.equals(batch) | |
296 | ||
297 | def test_put_and_get(self): | |
298 | for value in [["hello", "world", 3, 1.0], None, "hello"]: | |
299 | object_id = self.plasma_client.put(value) | |
300 | [result] = self.plasma_client.get([object_id]) | |
301 | assert result == value | |
302 | ||
303 | result = self.plasma_client.get(object_id) | |
304 | assert result == value | |
305 | ||
306 | object_id = random_object_id() | |
307 | [result] = self.plasma_client.get([object_id], timeout_ms=0) | |
308 | assert result == pa.plasma.ObjectNotAvailable | |
309 | ||
310 | @pytest.mark.filterwarnings( | |
311 | "ignore:'pyarrow.deserialize':FutureWarning") | |
312 | def test_put_and_get_raw_buffer(self): | |
313 | temp_id = random_object_id() | |
314 | use_meta = b"RAW" | |
315 | ||
316 | def deserialize_or_output(data_tuple): | |
317 | if data_tuple[0] == use_meta: | |
318 | return data_tuple[1].to_pybytes() | |
319 | else: | |
320 | if data_tuple[1] is None: | |
321 | return pa.plasma.ObjectNotAvailable | |
322 | else: | |
323 | return pa.deserialize(data_tuple[1]) | |
324 | ||
325 | for value in [b"Bytes Test", temp_id.binary(), 10 * b"\x00", 123]: | |
326 | if isinstance(value, bytes): | |
327 | object_id = self.plasma_client.put_raw_buffer( | |
328 | value, metadata=use_meta) | |
329 | else: | |
330 | object_id = self.plasma_client.put(value) | |
331 | [result] = self.plasma_client.get_buffers([object_id], | |
332 | with_meta=True) | |
333 | result = deserialize_or_output(result) | |
334 | assert result == value | |
335 | ||
336 | object_id = random_object_id() | |
337 | [result] = self.plasma_client.get_buffers([object_id], | |
338 | timeout_ms=0, | |
339 | with_meta=True) | |
340 | result = deserialize_or_output(result) | |
341 | assert result == pa.plasma.ObjectNotAvailable | |
342 | ||
343 | @pytest.mark.filterwarnings( | |
344 | "ignore:'serialization_context':FutureWarning") | |
345 | def test_put_and_get_serialization_context(self): | |
346 | ||
347 | class CustomType: | |
348 | def __init__(self, val): | |
349 | self.val = val | |
350 | ||
351 | val = CustomType(42) | |
352 | ||
353 | with pytest.raises(pa.ArrowSerializationError): | |
354 | self.plasma_client.put(val) | |
355 | ||
356 | serialization_context = pa.lib.SerializationContext() | |
357 | serialization_context.register_type(CustomType, 20*"\x00") | |
358 | ||
359 | object_id = self.plasma_client.put( | |
360 | val, None, serialization_context=serialization_context) | |
361 | ||
362 | with pytest.raises(pa.ArrowSerializationError): | |
363 | result = self.plasma_client.get(object_id) | |
364 | ||
365 | result = self.plasma_client.get( | |
366 | object_id, -1, serialization_context=serialization_context) | |
367 | assert result.val == val.val | |
368 | ||
369 | def test_store_arrow_objects(self): | |
370 | data = np.random.randn(10, 4) | |
371 | # Write an arrow object. | |
372 | object_id = random_object_id() | |
373 | tensor = pa.Tensor.from_numpy(data) | |
374 | data_size = pa.ipc.get_tensor_size(tensor) | |
375 | buf = self.plasma_client.create(object_id, data_size) | |
376 | stream = pa.FixedSizeBufferWriter(buf) | |
377 | pa.ipc.write_tensor(tensor, stream) | |
378 | self.plasma_client.seal(object_id) | |
379 | # Read the arrow object. | |
380 | [tensor] = self.plasma_client.get_buffers([object_id]) | |
381 | reader = pa.BufferReader(tensor) | |
382 | array = pa.ipc.read_tensor(reader).to_numpy() | |
383 | # Assert that they are equal. | |
384 | np.testing.assert_equal(data, array) | |
385 | ||
386 | @pytest.mark.pandas | |
387 | def test_store_pandas_dataframe(self): | |
388 | import pandas as pd | |
389 | import pyarrow.plasma as plasma | |
390 | d = {'one': pd.Series([1., 2., 3.], index=['a', 'b', 'c']), | |
391 | 'two': pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])} | |
392 | df = pd.DataFrame(d) | |
393 | ||
394 | # Write the DataFrame. | |
395 | record_batch = pa.RecordBatch.from_pandas(df) | |
396 | # Determine the size. | |
397 | s = pa.MockOutputStream() | |
398 | stream_writer = pa.RecordBatchStreamWriter(s, record_batch.schema) | |
399 | stream_writer.write_batch(record_batch) | |
400 | data_size = s.size() | |
401 | object_id = plasma.ObjectID(np.random.bytes(20)) | |
402 | ||
403 | buf = self.plasma_client.create(object_id, data_size) | |
404 | stream = pa.FixedSizeBufferWriter(buf) | |
405 | stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema) | |
406 | stream_writer.write_batch(record_batch) | |
407 | ||
408 | self.plasma_client.seal(object_id) | |
409 | ||
410 | # Read the DataFrame. | |
411 | [data] = self.plasma_client.get_buffers([object_id]) | |
412 | reader = pa.RecordBatchStreamReader(pa.BufferReader(data)) | |
413 | result = reader.read_next_batch().to_pandas() | |
414 | ||
415 | pd.testing.assert_frame_equal(df, result) | |
416 | ||
417 | def test_pickle_object_ids(self): | |
418 | # This can be used for sharing object IDs between processes. | |
419 | import pickle | |
420 | object_id = random_object_id() | |
421 | data = pickle.dumps(object_id) | |
422 | object_id2 = pickle.loads(data) | |
423 | assert object_id == object_id2 | |
424 | ||
425 | def test_store_full(self): | |
426 | # The store is started with 1GB, so make sure that create throws an | |
427 | # exception when it is full. | |
428 | def assert_create_raises_plasma_full(unit_test, size): | |
429 | partial_size = np.random.randint(size) | |
430 | try: | |
431 | _, memory_buffer, _ = create_object(unit_test.plasma_client, | |
432 | partial_size, | |
433 | size - partial_size) | |
434 | # TODO(pcm): More specific error here. | |
435 | except pa.lib.ArrowException: | |
436 | pass | |
437 | else: | |
438 | # For some reason the above didn't throw an exception, so fail. | |
439 | assert False | |
440 | ||
441 | PERCENT = DEFAULT_PLASMA_STORE_MEMORY // 100 | |
442 | ||
443 | # Create a list to keep some of the buffers in scope. | |
444 | memory_buffers = [] | |
445 | _, memory_buffer, _ = create_object(self.plasma_client, 50 * PERCENT) | |
446 | memory_buffers.append(memory_buffer) | |
447 | # Remaining space is 50%. Make sure that we can't create an | |
448 | # object of size 50% + 1, but we can create one of size 20%. | |
449 | assert_create_raises_plasma_full( | |
450 | self, 50 * PERCENT + SMALL_OBJECT_SIZE) | |
451 | _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) | |
452 | del memory_buffer | |
453 | _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) | |
454 | del memory_buffer | |
455 | assert_create_raises_plasma_full( | |
456 | self, 50 * PERCENT + SMALL_OBJECT_SIZE) | |
457 | ||
458 | _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) | |
459 | memory_buffers.append(memory_buffer) | |
460 | # Remaining space is 30%. | |
461 | assert_create_raises_plasma_full( | |
462 | self, 30 * PERCENT + SMALL_OBJECT_SIZE) | |
463 | ||
464 | _, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT) | |
465 | memory_buffers.append(memory_buffer) | |
466 | # Remaining space is 20%. | |
467 | assert_create_raises_plasma_full( | |
468 | self, 20 * PERCENT + SMALL_OBJECT_SIZE) | |
469 | ||
470 | def test_contains(self): | |
471 | fake_object_ids = [random_object_id() for _ in range(100)] | |
472 | real_object_ids = [random_object_id() for _ in range(100)] | |
473 | for object_id in real_object_ids: | |
474 | assert self.plasma_client.contains(object_id) is False | |
475 | self.plasma_client.create(object_id, 100) | |
476 | self.plasma_client.seal(object_id) | |
477 | assert self.plasma_client.contains(object_id) | |
478 | for object_id in fake_object_ids: | |
479 | assert not self.plasma_client.contains(object_id) | |
480 | for object_id in real_object_ids: | |
481 | assert self.plasma_client.contains(object_id) | |
482 | ||
483 | def test_hash(self): | |
484 | # Check the hash of an object that doesn't exist. | |
485 | object_id1 = random_object_id() | |
486 | try: | |
487 | self.plasma_client.hash(object_id1) | |
488 | # TODO(pcm): Introduce a more specific error type here | |
489 | except pa.lib.ArrowException: | |
490 | pass | |
491 | else: | |
492 | assert False | |
493 | ||
494 | length = 1000 | |
495 | # Create a random object, and check that the hash function always | |
496 | # returns the same value. | |
497 | metadata = generate_metadata(length) | |
498 | memory_buffer = np.frombuffer(self.plasma_client.create(object_id1, | |
499 | length, | |
500 | metadata), | |
501 | dtype="uint8") | |
502 | for i in range(length): | |
503 | memory_buffer[i] = i % 256 | |
504 | self.plasma_client.seal(object_id1) | |
505 | assert (self.plasma_client.hash(object_id1) == | |
506 | self.plasma_client.hash(object_id1)) | |
507 | ||
508 | # Create a second object with the same value as the first, and check | |
509 | # that their hashes are equal. | |
510 | object_id2 = random_object_id() | |
511 | memory_buffer = np.frombuffer(self.plasma_client.create(object_id2, | |
512 | length, | |
513 | metadata), | |
514 | dtype="uint8") | |
515 | for i in range(length): | |
516 | memory_buffer[i] = i % 256 | |
517 | self.plasma_client.seal(object_id2) | |
518 | assert (self.plasma_client.hash(object_id1) == | |
519 | self.plasma_client.hash(object_id2)) | |
520 | ||
521 | # Create a third object with a different value from the first two, and | |
522 | # check that its hash is different. | |
523 | object_id3 = random_object_id() | |
524 | metadata = generate_metadata(length) | |
525 | memory_buffer = np.frombuffer(self.plasma_client.create(object_id3, | |
526 | length, | |
527 | metadata), | |
528 | dtype="uint8") | |
529 | for i in range(length): | |
530 | memory_buffer[i] = (i + 1) % 256 | |
531 | self.plasma_client.seal(object_id3) | |
532 | assert (self.plasma_client.hash(object_id1) != | |
533 | self.plasma_client.hash(object_id3)) | |
534 | ||
535 | # Create a fourth object with the same value as the third, but | |
536 | # different metadata. Check that its hash is different from any of the | |
537 | # previous three. | |
538 | object_id4 = random_object_id() | |
539 | metadata4 = generate_metadata(length) | |
540 | memory_buffer = np.frombuffer(self.plasma_client.create(object_id4, | |
541 | length, | |
542 | metadata4), | |
543 | dtype="uint8") | |
544 | for i in range(length): | |
545 | memory_buffer[i] = (i + 1) % 256 | |
546 | self.plasma_client.seal(object_id4) | |
547 | assert (self.plasma_client.hash(object_id1) != | |
548 | self.plasma_client.hash(object_id4)) | |
549 | assert (self.plasma_client.hash(object_id3) != | |
550 | self.plasma_client.hash(object_id4)) | |
551 | ||
552 | def test_many_hashes(self): | |
553 | hashes = [] | |
554 | length = 2 ** 10 | |
555 | ||
556 | for i in range(256): | |
557 | object_id = random_object_id() | |
558 | memory_buffer = np.frombuffer(self.plasma_client.create(object_id, | |
559 | length), | |
560 | dtype="uint8") | |
561 | for j in range(length): | |
562 | memory_buffer[j] = i | |
563 | self.plasma_client.seal(object_id) | |
564 | hashes.append(self.plasma_client.hash(object_id)) | |
565 | ||
566 | # Create objects of varying length. Each pair has two bits different. | |
567 | for i in range(length): | |
568 | object_id = random_object_id() | |
569 | memory_buffer = np.frombuffer(self.plasma_client.create(object_id, | |
570 | length), | |
571 | dtype="uint8") | |
572 | for j in range(length): | |
573 | memory_buffer[j] = 0 | |
574 | memory_buffer[i] = 1 | |
575 | self.plasma_client.seal(object_id) | |
576 | hashes.append(self.plasma_client.hash(object_id)) | |
577 | ||
578 | # Create objects of varying length, all with value 0. | |
579 | for i in range(length): | |
580 | object_id = random_object_id() | |
581 | memory_buffer = np.frombuffer(self.plasma_client.create(object_id, | |
582 | i), | |
583 | dtype="uint8") | |
584 | for j in range(i): | |
585 | memory_buffer[j] = 0 | |
586 | self.plasma_client.seal(object_id) | |
587 | hashes.append(self.plasma_client.hash(object_id)) | |
588 | ||
589 | # Check that all hashes were unique. | |
590 | assert len(set(hashes)) == 256 + length + length | |
591 | ||
592 | # def test_individual_delete(self): | |
593 | # length = 100 | |
594 | # # Create an object id string. | |
595 | # object_id = random_object_id() | |
596 | # # Create a random metadata string. | |
597 | # metadata = generate_metadata(100) | |
598 | # # Create a new buffer and write to it. | |
599 | # memory_buffer = self.plasma_client.create(object_id, length, | |
600 | # metadata) | |
601 | # for i in range(length): | |
602 | # memory_buffer[i] = chr(i % 256) | |
603 | # # Seal the object. | |
604 | # self.plasma_client.seal(object_id) | |
605 | # # Check that the object is present. | |
606 | # assert self.plasma_client.contains(object_id) | |
607 | # # Delete the object. | |
608 | # self.plasma_client.delete(object_id) | |
609 | # # Make sure the object is no longer present. | |
610 | # self.assertFalse(self.plasma_client.contains(object_id)) | |
611 | # | |
612 | # def test_delete(self): | |
613 | # # Create some objects. | |
614 | # object_ids = [random_object_id() for _ in range(100)] | |
615 | # for object_id in object_ids: | |
616 | # length = 100 | |
617 | # # Create a random metadata string. | |
618 | # metadata = generate_metadata(100) | |
619 | # # Create a new buffer and write to it. | |
620 | # memory_buffer = self.plasma_client.create(object_id, length, | |
621 | # metadata) | |
622 | # for i in range(length): | |
623 | # memory_buffer[i] = chr(i % 256) | |
624 | # # Seal the object. | |
625 | # self.plasma_client.seal(object_id) | |
626 | # # Check that the object is present. | |
627 | # assert self.plasma_client.contains(object_id) | |
628 | # | |
629 | # # Delete the objects and make sure they are no longer present. | |
630 | # for object_id in object_ids: | |
631 | # # Delete the object. | |
632 | # self.plasma_client.delete(object_id) | |
633 | # # Make sure the object is no longer present. | |
634 | # self.assertFalse(self.plasma_client.contains(object_id)) | |
635 | ||
636 | def test_illegal_functionality(self): | |
637 | # Create an object id string. | |
638 | object_id = random_object_id() | |
639 | # Create a new buffer and write to it. | |
640 | length = 1000 | |
641 | memory_buffer = self.plasma_client.create(object_id, length) | |
642 | # Make sure we cannot access memory out of bounds. | |
643 | with pytest.raises(Exception): | |
644 | memory_buffer[length] | |
645 | # Seal the object. | |
646 | self.plasma_client.seal(object_id) | |
647 | # This test is commented out because it currently fails. | |
648 | # # Make sure the object is ready only now. | |
649 | # def illegal_assignment(): | |
650 | # memory_buffer[0] = chr(0) | |
651 | # with pytest.raises(Exception): | |
652 | # illegal_assignment() | |
653 | # Get the object. | |
654 | memory_buffer = self.plasma_client.get_buffers([object_id])[0] | |
655 | ||
656 | # Make sure the object is read only. | |
657 | def illegal_assignment(): | |
658 | memory_buffer[0] = chr(0) | |
659 | with pytest.raises(Exception): | |
660 | illegal_assignment() | |
661 | ||
662 | def test_evict(self): | |
663 | client = self.plasma_client2 | |
664 | object_id1 = random_object_id() | |
665 | b1 = client.create(object_id1, 1000) | |
666 | client.seal(object_id1) | |
667 | del b1 | |
668 | assert client.evict(1) == 1000 | |
669 | ||
670 | object_id2 = random_object_id() | |
671 | object_id3 = random_object_id() | |
672 | b2 = client.create(object_id2, 999) | |
673 | b3 = client.create(object_id3, 998) | |
674 | client.seal(object_id3) | |
675 | del b3 | |
676 | assert client.evict(1000) == 998 | |
677 | ||
678 | object_id4 = random_object_id() | |
679 | b4 = client.create(object_id4, 997) | |
680 | client.seal(object_id4) | |
681 | del b4 | |
682 | client.seal(object_id2) | |
683 | del b2 | |
684 | assert client.evict(1) == 997 | |
685 | assert client.evict(1) == 999 | |
686 | ||
687 | object_id5 = random_object_id() | |
688 | object_id6 = random_object_id() | |
689 | object_id7 = random_object_id() | |
690 | b5 = client.create(object_id5, 996) | |
691 | b6 = client.create(object_id6, 995) | |
692 | b7 = client.create(object_id7, 994) | |
693 | client.seal(object_id5) | |
694 | client.seal(object_id6) | |
695 | client.seal(object_id7) | |
696 | del b5 | |
697 | del b6 | |
698 | del b7 | |
699 | assert client.evict(2000) == 996 + 995 + 994 | |
700 | ||
701 | # Mitigate valgrind-induced slowness | |
702 | SUBSCRIBE_TEST_SIZES = ([1, 10, 100, 1000] if USE_VALGRIND | |
703 | else [1, 10, 100, 1000, 10000]) | |
704 | ||
705 | def test_subscribe(self): | |
706 | # Subscribe to notifications from the Plasma Store. | |
707 | self.plasma_client.subscribe() | |
708 | for i in self.SUBSCRIBE_TEST_SIZES: | |
709 | object_ids = [random_object_id() for _ in range(i)] | |
710 | metadata_sizes = [np.random.randint(1000) for _ in range(i)] | |
711 | data_sizes = [np.random.randint(1000) for _ in range(i)] | |
712 | for j in range(i): | |
713 | self.plasma_client.create( | |
714 | object_ids[j], data_sizes[j], | |
715 | metadata=bytearray(np.random.bytes(metadata_sizes[j]))) | |
716 | self.plasma_client.seal(object_ids[j]) | |
717 | # Check that we received notifications for all of the objects. | |
718 | for j in range(i): | |
719 | notification_info = self.plasma_client.get_next_notification() | |
720 | recv_objid, recv_dsize, recv_msize = notification_info | |
721 | assert object_ids[j] == recv_objid | |
722 | assert data_sizes[j] == recv_dsize | |
723 | assert metadata_sizes[j] == recv_msize | |
724 | ||
725 | def test_subscribe_socket(self): | |
726 | # Subscribe to notifications from the Plasma Store. | |
727 | self.plasma_client.subscribe() | |
728 | rsock = self.plasma_client.get_notification_socket() | |
729 | for i in self.SUBSCRIBE_TEST_SIZES: | |
730 | # Get notification from socket. | |
731 | object_ids = [random_object_id() for _ in range(i)] | |
732 | metadata_sizes = [np.random.randint(1000) for _ in range(i)] | |
733 | data_sizes = [np.random.randint(1000) for _ in range(i)] | |
734 | ||
735 | for j in range(i): | |
736 | self.plasma_client.create( | |
737 | object_ids[j], data_sizes[j], | |
738 | metadata=bytearray(np.random.bytes(metadata_sizes[j]))) | |
739 | self.plasma_client.seal(object_ids[j]) | |
740 | ||
741 | # Check that we received notifications for all of the objects. | |
742 | for j in range(i): | |
743 | # Assume the plasma store will not be full, | |
744 | # so we always get the data size instead of -1. | |
745 | msg_len, = struct.unpack('L', rsock.recv(8)) | |
746 | content = rsock.recv(msg_len) | |
747 | recv_objids, recv_dsizes, recv_msizes = ( | |
748 | self.plasma_client.decode_notifications(content)) | |
749 | assert object_ids[j] == recv_objids[0] | |
750 | assert data_sizes[j] == recv_dsizes[0] | |
751 | assert metadata_sizes[j] == recv_msizes[0] | |
752 | ||
753 | def test_subscribe_deletions(self): | |
754 | # Subscribe to notifications from the Plasma Store. We use | |
755 | # plasma_client2 to make sure that all used objects will get evicted | |
756 | # properly. | |
757 | self.plasma_client2.subscribe() | |
758 | for i in self.SUBSCRIBE_TEST_SIZES: | |
759 | object_ids = [random_object_id() for _ in range(i)] | |
760 | # Add 1 to the sizes to make sure we have nonzero object sizes. | |
761 | metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)] | |
762 | data_sizes = [np.random.randint(1000) + 1 for _ in range(i)] | |
763 | for j in range(i): | |
764 | x = self.plasma_client2.create( | |
765 | object_ids[j], data_sizes[j], | |
766 | metadata=bytearray(np.random.bytes(metadata_sizes[j]))) | |
767 | self.plasma_client2.seal(object_ids[j]) | |
768 | del x | |
769 | # Check that we received notifications for creating all of the | |
770 | # objects. | |
771 | for j in range(i): | |
772 | notification_info = self.plasma_client2.get_next_notification() | |
773 | recv_objid, recv_dsize, recv_msize = notification_info | |
774 | assert object_ids[j] == recv_objid | |
775 | assert data_sizes[j] == recv_dsize | |
776 | assert metadata_sizes[j] == recv_msize | |
777 | ||
778 | # Check that we receive notifications for deleting all objects, as | |
779 | # we evict them. | |
780 | for j in range(i): | |
781 | assert (self.plasma_client2.evict(1) == | |
782 | data_sizes[j] + metadata_sizes[j]) | |
783 | notification_info = self.plasma_client2.get_next_notification() | |
784 | recv_objid, recv_dsize, recv_msize = notification_info | |
785 | assert object_ids[j] == recv_objid | |
786 | assert -1 == recv_dsize | |
787 | assert -1 == recv_msize | |
788 | ||
789 | # Test multiple deletion notifications. The first 9 object IDs have | |
790 | # size 0, and the last has a nonzero size. When Plasma evicts 1 byte, | |
791 | # it will evict all objects, so we should receive deletion | |
792 | # notifications for each. | |
793 | num_object_ids = 10 | |
794 | object_ids = [random_object_id() for _ in range(num_object_ids)] | |
795 | metadata_sizes = [0] * (num_object_ids - 1) | |
796 | data_sizes = [0] * (num_object_ids - 1) | |
797 | metadata_sizes.append(np.random.randint(1000)) | |
798 | data_sizes.append(np.random.randint(1000)) | |
799 | for i in range(num_object_ids): | |
800 | x = self.plasma_client2.create( | |
801 | object_ids[i], data_sizes[i], | |
802 | metadata=bytearray(np.random.bytes(metadata_sizes[i]))) | |
803 | self.plasma_client2.seal(object_ids[i]) | |
804 | del x | |
805 | for i in range(num_object_ids): | |
806 | notification_info = self.plasma_client2.get_next_notification() | |
807 | recv_objid, recv_dsize, recv_msize = notification_info | |
808 | assert object_ids[i] == recv_objid | |
809 | assert data_sizes[i] == recv_dsize | |
810 | assert metadata_sizes[i] == recv_msize | |
811 | assert (self.plasma_client2.evict(1) == | |
812 | data_sizes[-1] + metadata_sizes[-1]) | |
813 | for i in range(num_object_ids): | |
814 | notification_info = self.plasma_client2.get_next_notification() | |
815 | recv_objid, recv_dsize, recv_msize = notification_info | |
816 | assert object_ids[i] == recv_objid | |
817 | assert -1 == recv_dsize | |
818 | assert -1 == recv_msize | |
819 | ||
820 | def test_use_full_memory(self): | |
821 | # Fill the object store up with a large number of small objects and let | |
822 | # them go out of scope. | |
823 | for _ in range(100): | |
824 | create_object( | |
825 | self.plasma_client2, | |
826 | np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0) | |
827 | # Create large objects that require the full object store size, and | |
828 | # verify that they fit. | |
829 | for _ in range(2): | |
830 | create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0) | |
831 | # Verify that an object that is too large does not fit. | |
832 | # Also verifies that the right error is thrown, and does not | |
833 | # create the object ID prematurely. | |
834 | object_id = random_object_id() | |
835 | for i in range(3): | |
836 | with pytest.raises(pa.plasma.PlasmaStoreFull): | |
837 | self.plasma_client2.create( | |
838 | object_id, DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE) | |
839 | ||
840 | @staticmethod | |
841 | def _client_blocked_in_get(plasma_store_name, object_id): | |
842 | import pyarrow.plasma as plasma | |
843 | client = plasma.connect(plasma_store_name) | |
844 | # Try to get an object ID that doesn't exist. This should block. | |
845 | client.get([object_id]) | |
846 | ||
847 | def test_client_death_during_get(self): | |
848 | object_id = random_object_id() | |
849 | ||
850 | p = multiprocessing.Process(target=self._client_blocked_in_get, | |
851 | args=(self.plasma_store_name, object_id)) | |
852 | p.start() | |
853 | # Make sure the process is running. | |
854 | time.sleep(0.2) | |
855 | assert p.is_alive() | |
856 | ||
857 | # Kill the client process. | |
858 | p.terminate() | |
859 | # Wait a little for the store to process the disconnect event. | |
860 | time.sleep(0.1) | |
861 | ||
862 | # Create the object. | |
863 | self.plasma_client.put(1, object_id=object_id) | |
864 | ||
865 | # Check that the store is still alive. This will raise an exception if | |
866 | # the store is dead. | |
867 | self.plasma_client.contains(random_object_id()) | |
868 | ||
869 | @staticmethod | |
870 | def _client_get_multiple(plasma_store_name, object_ids): | |
871 | import pyarrow.plasma as plasma | |
872 | client = plasma.connect(plasma_store_name) | |
873 | # Try to get an object ID that doesn't exist. This should block. | |
874 | client.get(object_ids) | |
875 | ||
876 | def test_client_getting_multiple_objects(self): | |
877 | object_ids = [random_object_id() for _ in range(10)] | |
878 | ||
879 | p = multiprocessing.Process(target=self._client_get_multiple, | |
880 | args=(self.plasma_store_name, object_ids)) | |
881 | p.start() | |
882 | # Make sure the process is running. | |
883 | time.sleep(0.2) | |
884 | assert p.is_alive() | |
885 | ||
886 | # Create the objects one by one. | |
887 | for object_id in object_ids: | |
888 | self.plasma_client.put(1, object_id=object_id) | |
889 | ||
890 | # Check that the store is still alive. This will raise an exception if | |
891 | # the store is dead. | |
892 | self.plasma_client.contains(random_object_id()) | |
893 | ||
894 | # Make sure that the blocked client finishes. | |
895 | start_time = time.time() | |
896 | while True: | |
897 | if time.time() - start_time > 5: | |
898 | raise Exception("Timing out while waiting for blocked client " | |
899 | "to finish.") | |
900 | if not p.is_alive(): | |
901 | break | |
902 | ||
903 | ||
904 | @pytest.mark.plasma | |
905 | class TestEvictionToExternalStore: | |
906 | ||
907 | def setup_method(self, test_method): | |
908 | import pyarrow.plasma as plasma | |
909 | # Start Plasma store. | |
910 | self.plasma_store_ctx = plasma.start_plasma_store( | |
911 | plasma_store_memory=1000 * 1024, | |
912 | use_valgrind=USE_VALGRIND, | |
913 | external_store=EXTERNAL_STORE) | |
914 | self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() | |
915 | # Connect to Plasma. | |
916 | self.plasma_client = plasma.connect(self.plasma_store_name) | |
917 | ||
918 | def teardown_method(self, test_method): | |
919 | try: | |
920 | # Check that the Plasma store is still alive. | |
921 | assert self.p.poll() is None | |
922 | self.p.send_signal(signal.SIGTERM) | |
923 | self.p.wait(timeout=5) | |
924 | finally: | |
925 | self.plasma_store_ctx.__exit__(None, None, None) | |
926 | ||
927 | def test_eviction(self): | |
928 | client = self.plasma_client | |
929 | ||
930 | object_ids = [random_object_id() for _ in range(0, 20)] | |
931 | data = b'x' * 100 * 1024 | |
932 | metadata = b'' | |
933 | ||
934 | for i in range(0, 20): | |
935 | # Test for object non-existence. | |
936 | assert not client.contains(object_ids[i]) | |
937 | ||
938 | # Create and seal the object. | |
939 | client.create_and_seal(object_ids[i], data, metadata) | |
940 | ||
941 | # Test that the client can get the object. | |
942 | assert client.contains(object_ids[i]) | |
943 | ||
944 | for i in range(0, 20): | |
945 | # Since we are accessing objects sequentially, every object we | |
946 | # access would be a cache "miss" owing to LRU eviction. | |
947 | # Try and access the object from the plasma store first, and then | |
948 | # try external store on failure. This should succeed to fetch the | |
949 | # object. However, it may evict the next few objects. | |
950 | [result] = client.get_buffers([object_ids[i]]) | |
951 | assert result.to_pybytes() == data | |
952 | ||
953 | # Make sure we still cannot fetch objects that do not exist | |
954 | [result] = client.get_buffers([random_object_id()], timeout_ms=100) | |
955 | assert result is None | |
956 | ||
957 | ||
958 | @pytest.mark.plasma | |
959 | def test_object_id_size(): | |
960 | import pyarrow.plasma as plasma | |
961 | with pytest.raises(ValueError): | |
962 | plasma.ObjectID("hello") | |
963 | plasma.ObjectID(20 * b"0") | |
964 | ||
965 | ||
966 | @pytest.mark.plasma | |
967 | def test_object_id_equality_operators(): | |
968 | import pyarrow.plasma as plasma | |
969 | ||
970 | oid1 = plasma.ObjectID(20 * b'0') | |
971 | oid2 = plasma.ObjectID(20 * b'0') | |
972 | oid3 = plasma.ObjectID(19 * b'0' + b'1') | |
973 | ||
974 | assert oid1 == oid2 | |
975 | assert oid2 != oid3 | |
976 | assert oid1 != 'foo' | |
977 | ||
978 | ||
979 | @pytest.mark.xfail(reason="often fails on travis") | |
980 | @pytest.mark.skipif(not os.path.exists("/mnt/hugepages"), | |
981 | reason="requires hugepage support") | |
982 | def test_use_huge_pages(): | |
983 | import pyarrow.plasma as plasma | |
984 | with plasma.start_plasma_store( | |
985 | plasma_store_memory=2*10**9, | |
986 | plasma_directory="/mnt/hugepages", | |
987 | use_hugepages=True) as (plasma_store_name, p): | |
988 | plasma_client = plasma.connect(plasma_store_name) | |
989 | create_object(plasma_client, 10**8) | |
990 | ||
991 | ||
992 | # This is checking to make sure plasma_clients cannot be destroyed | |
993 | # before all the PlasmaBuffers that have handles to them are | |
994 | # destroyed, see ARROW-2448. | |
995 | @pytest.mark.plasma | |
996 | def test_plasma_client_sharing(): | |
997 | import pyarrow.plasma as plasma | |
998 | ||
999 | with plasma.start_plasma_store( | |
1000 | plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \ | |
1001 | as (plasma_store_name, p): | |
1002 | plasma_client = plasma.connect(plasma_store_name) | |
1003 | object_id = plasma_client.put(np.zeros(3)) | |
1004 | buf = plasma_client.get(object_id) | |
1005 | del plasma_client | |
1006 | assert (buf == np.zeros(3)).all() | |
1007 | del buf # This segfaulted pre ARROW-2448. | |
1008 | ||
1009 | ||
1010 | @pytest.mark.plasma | |
1011 | def test_plasma_list(): | |
1012 | import pyarrow.plasma as plasma | |
1013 | ||
1014 | with plasma.start_plasma_store( | |
1015 | plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \ | |
1016 | as (plasma_store_name, p): | |
1017 | plasma_client = plasma.connect(plasma_store_name) | |
1018 | ||
1019 | # Test sizes | |
1020 | u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False) | |
1021 | l1 = plasma_client.list() | |
1022 | assert l1[u]["data_size"] == 11 | |
1023 | assert l1[u]["metadata_size"] == 7 | |
1024 | ||
1025 | # Test ref_count | |
1026 | v = plasma_client.put(np.zeros(3)) | |
1027 | # Ref count has already been released | |
1028 | # XXX flaky test, disabled (ARROW-3344) | |
1029 | # l2 = plasma_client.list() | |
1030 | # assert l2[v]["ref_count"] == 0 | |
1031 | a = plasma_client.get(v) | |
1032 | l3 = plasma_client.list() | |
1033 | assert l3[v]["ref_count"] == 1 | |
1034 | del a | |
1035 | ||
1036 | # Test state | |
1037 | w, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False) | |
1038 | l4 = plasma_client.list() | |
1039 | assert l4[w]["state"] == "created" | |
1040 | plasma_client.seal(w) | |
1041 | l5 = plasma_client.list() | |
1042 | assert l5[w]["state"] == "sealed" | |
1043 | ||
1044 | # Test timestamps | |
1045 | slack = 1.5 # seconds | |
1046 | t1 = time.time() | |
1047 | x, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False) | |
1048 | t2 = time.time() | |
1049 | l6 = plasma_client.list() | |
1050 | assert t1 - slack <= l6[x]["create_time"] <= t2 + slack | |
1051 | time.sleep(2.0) | |
1052 | t3 = time.time() | |
1053 | plasma_client.seal(x) | |
1054 | t4 = time.time() | |
1055 | l7 = plasma_client.list() | |
1056 | assert t3 - t2 - slack <= l7[x]["construct_duration"] | |
1057 | assert l7[x]["construct_duration"] <= t4 - t1 + slack | |
1058 | ||
1059 | ||
1060 | @pytest.mark.plasma | |
1061 | def test_object_id_randomness(): | |
1062 | cmd = "from pyarrow import plasma; print(plasma.ObjectID.from_random())" | |
1063 | first_object_id = subprocess.check_output([sys.executable, "-c", cmd]) | |
1064 | second_object_id = subprocess.check_output([sys.executable, "-c", cmd]) | |
1065 | assert first_object_id != second_object_id | |
1066 | ||
1067 | ||
1068 | @pytest.mark.plasma | |
1069 | def test_store_capacity(): | |
1070 | import pyarrow.plasma as plasma | |
1071 | with plasma.start_plasma_store(plasma_store_memory=10000) as (name, p): | |
1072 | plasma_client = plasma.connect(name) | |
1073 | assert plasma_client.store_capacity() == 10000 |