]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/python/pyarrow/tests/test_plasma.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / tests / test_plasma.py
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
9# http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
19import multiprocessing
20import os
21import pytest
22import random
23import signal
24import struct
25import subprocess
26import sys
27import time
29import numpy as np
30import pyarrow as pa
34USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1"
35EXTERNAL_STORE = "hashtable://test"
39def random_name():
40 return str(random.randint(0, 99999999))
43def random_object_id():
44 import pyarrow.plasma as plasma
45 return plasma.ObjectID(np.random.bytes(20))
48def generate_metadata(length):
49 metadata = bytearray(length)
50 if length > 0:
51 metadata[0] = random.randint(0, 255)
52 metadata[-1] = random.randint(0, 255)
53 for _ in range(100):
54 metadata[random.randint(0, length - 1)] = random.randint(0, 255)
55 return metadata
58def write_to_data_buffer(buff, length):
59 array = np.frombuffer(buff, dtype="uint8")
60 if length > 0:
61 array[0] = random.randint(0, 255)
62 array[-1] = random.randint(0, 255)
63 for _ in range(100):
64 array[random.randint(0, length - 1)] = random.randint(0, 255)
67def create_object_with_id(client, object_id, data_size, metadata_size,
68 seal=True):
69 metadata = generate_metadata(metadata_size)
70 memory_buffer = client.create(object_id, data_size, metadata)
71 write_to_data_buffer(memory_buffer, data_size)
72 if seal:
73 client.seal(object_id)
74 return memory_buffer, metadata
77def create_object(client, data_size, metadata_size=0, seal=True):
78 object_id = random_object_id()
79 memory_buffer, metadata = create_object_with_id(client, object_id,
80 data_size, metadata_size,
81 seal=seal)
82 return object_id, memory_buffer, metadata
86class TestPlasmaClient:
88 def setup_method(self, test_method):
89 import pyarrow.plasma as plasma
90 # Start Plasma store.
91 self.plasma_store_ctx = plasma.start_plasma_store(
92 plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
93 use_valgrind=USE_VALGRIND)
94 self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
95 # Connect to Plasma.
96 self.plasma_client = plasma.connect(self.plasma_store_name)
97 self.plasma_client2 = plasma.connect(self.plasma_store_name)
99 def teardown_method(self, test_method):
100 try:
101 # Check that the Plasma store is still alive.
102 assert self.p.poll() is None
103 # Ensure Valgrind and/or coverage have a clean exit
104 # Valgrind misses SIGTERM if it is delivered before the
105 # event loop is ready; this race condition is mitigated
106 # but not solved by time.sleep().
108 time.sleep(1.0)
109 self.p.send_signal(signal.SIGTERM)
110 self.p.wait(timeout=5)
111 assert self.p.returncode == 0
112 finally:
113 self.plasma_store_ctx.__exit__(None, None, None)
115 def test_connection_failure_raises_exception(self):
116 import pyarrow.plasma as plasma
117 # ARROW-1264
118 with pytest.raises(IOError):
119 plasma.connect('unknown-store-name', num_retries=1)
121 def test_create(self):
122 # Create an object id string.
123 object_id = random_object_id()
124 # Create a new buffer and write to it.
125 length = 50
126 memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
127 length),
128 dtype="uint8")
129 for i in range(length):
130 memory_buffer[i] = i % 256
131 # Seal the object.
132 self.plasma_client.seal(object_id)
133 # Get the object.
134 memory_buffer = np.frombuffer(
135 self.plasma_client.get_buffers([object_id])[0], dtype="uint8")
136 for i in range(length):
137 assert memory_buffer[i] == i % 256
139 def test_create_with_metadata(self):
140 for length in range(0, 1000, 3):
141 # Create an object id string.
142 object_id = random_object_id()
143 # Create a random metadata string.
144 metadata = generate_metadata(length)
145 # Create a new buffer and write to it.
146 memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
147 length,
148 metadata),
149 dtype="uint8")
150 for i in range(length):
151 memory_buffer[i] = i % 256
152 # Seal the object.
153 self.plasma_client.seal(object_id)
154 # Get the object.
155 memory_buffer = np.frombuffer(
156 self.plasma_client.get_buffers([object_id])[0], dtype="uint8")
157 for i in range(length):
158 assert memory_buffer[i] == i % 256
159 # Get the metadata.
160 metadata_buffer = np.frombuffer(
161 self.plasma_client.get_metadata([object_id])[0], dtype="uint8")
162 assert len(metadata) == len(metadata_buffer)
163 for i in range(len(metadata)):
164 assert metadata[i] == metadata_buffer[i]
166 def test_create_existing(self):
167 # This test is partially used to test the code path in which we create
168 # an object with an ID that already exists
169 length = 100
170 for _ in range(1000):
171 object_id = random_object_id()
172 self.plasma_client.create(object_id, length,
173 generate_metadata(length))
174 try:
175 self.plasma_client.create(object_id, length,
176 generate_metadata(length))
177 # TODO(pcm): Introduce a more specific error type here.
178 except pa.lib.ArrowException:
179 pass
180 else:
181 assert False
183 def test_create_and_seal(self):
185 # Create a bunch of objects.
186 object_ids = []
187 for i in range(1000):
188 object_id = random_object_id()
189 object_ids.append(object_id)
190 self.plasma_client.create_and_seal(object_id, i * b'a', i * b'b')
192 for i in range(1000):
193 [data_tuple] = self.plasma_client.get_buffers([object_ids[i]],
194 with_meta=True)
195 assert data_tuple[1].to_pybytes() == i * b'a'
196 assert (self.plasma_client.get_metadata(
197 [object_ids[i]])[0].to_pybytes() ==
198 i * b'b')
200 # Make sure that creating the same object twice raises an exception.
201 object_id = random_object_id()
202 self.plasma_client.create_and_seal(object_id, b'a', b'b')
203 with pytest.raises(pa.plasma.PlasmaObjectExists):
204 self.plasma_client.create_and_seal(object_id, b'a', b'b')
206 # Make sure that these objects can be evicted.
207 big_object = DEFAULT_PLASMA_STORE_MEMORY // 10 * b'a'
208 object_ids = []
209 for _ in range(20):
210 object_id = random_object_id()
211 object_ids.append(object_id)
212 self.plasma_client.create_and_seal(random_object_id(), big_object,
213 big_object)
214 for i in range(10):
215 assert not self.plasma_client.contains(object_ids[i])
217 def test_get(self):
218 num_object_ids = 60
219 # Test timing out of get with various timeouts.
220 for timeout in [0, 10, 100, 1000]:
221 object_ids = [random_object_id() for _ in range(num_object_ids)]
222 results = self.plasma_client.get_buffers(object_ids,
223 timeout_ms=timeout)
224 assert results == num_object_ids * [None]
226 data_buffers = []
227 metadata_buffers = []
228 for i in range(num_object_ids):
229 if i % 2 == 0:
230 data_buffer, metadata_buffer = create_object_with_id(
231 self.plasma_client, object_ids[i], 2000, 2000)
232 data_buffers.append(data_buffer)
233 metadata_buffers.append(metadata_buffer)
235 # Test timing out from some but not all get calls with various
236 # timeouts.
237 for timeout in [0, 10, 100, 1000]:
238 data_results = self.plasma_client.get_buffers(object_ids,
239 timeout_ms=timeout)
240 # metadata_results = self.plasma_client.get_metadata(
241 # object_ids, timeout_ms=timeout)
242 for i in range(num_object_ids):
243 if i % 2 == 0:
244 array1 = np.frombuffer(data_buffers[i // 2], dtype="uint8")
245 array2 = np.frombuffer(data_results[i], dtype="uint8")
246 np.testing.assert_equal(array1, array2)
247 # TODO(rkn): We should compare the metadata as well. But
248 # currently the types are different (e.g., memoryview
249 # versus bytearray).
250 # assert plasma.buffers_equal(
251 # metadata_buffers[i // 2], metadata_results[i])
252 else:
253 assert results[i] is None
255 # Test trying to get an object that was created by the same client but
256 # not sealed.
257 object_id = random_object_id()
258 self.plasma_client.create(object_id, 10, b"metadata")
259 assert self.plasma_client.get_buffers(
260 [object_id], timeout_ms=0, with_meta=True)[0][1] is None
261 assert self.plasma_client.get_buffers(
262 [object_id], timeout_ms=1, with_meta=True)[0][1] is None
263 self.plasma_client.seal(object_id)
264 assert self.plasma_client.get_buffers(
265 [object_id], timeout_ms=0, with_meta=True)[0][1] is not None
267 def test_buffer_lifetime(self):
268 # ARROW-2195
269 arr = pa.array([1, 12, 23, 3, 34], pa.int32())
270 batch = pa.RecordBatch.from_arrays([arr], ['field1'])
272 # Serialize RecordBatch into Plasma store
273 sink = pa.MockOutputStream()
274 writer = pa.RecordBatchStreamWriter(sink, batch.schema)
275 writer.write_batch(batch)
276 writer.close()
278 object_id = random_object_id()
279 data_buffer = self.plasma_client.create(object_id, sink.size())
280 stream = pa.FixedSizeBufferWriter(data_buffer)
281 writer = pa.RecordBatchStreamWriter(stream, batch.schema)
282 writer.write_batch(batch)
283 writer.close()
284 self.plasma_client.seal(object_id)
285 del data_buffer
287 # Unserialize RecordBatch from Plasma store
288 [data_buffer] = self.plasma_client2.get_buffers([object_id])
289 reader = pa.RecordBatchStreamReader(data_buffer)
290 read_batch = reader.read_next_batch()
291 # Lose reference to returned buffer. The RecordBatch must still
292 # be backed by valid memory.
293 del data_buffer, reader
295 assert read_batch.equals(batch)
297 def test_put_and_get(self):
298 for value in [["hello", "world", 3, 1.0], None, "hello"]:
299 object_id = self.plasma_client.put(value)
300 [result] = self.plasma_client.get([object_id])
301 assert result == value
303 result = self.plasma_client.get(object_id)
304 assert result == value
306 object_id = random_object_id()
307 [result] = self.plasma_client.get([object_id], timeout_ms=0)
308 assert result == pa.plasma.ObjectNotAvailable
310 @pytest.mark.filterwarnings(
311 "ignore:'pyarrow.deserialize':FutureWarning")
312 def test_put_and_get_raw_buffer(self):
313 temp_id = random_object_id()
314 use_meta = b"RAW"
316 def deserialize_or_output(data_tuple):
317 if data_tuple[0] == use_meta:
318 return data_tuple[1].to_pybytes()
319 else:
320 if data_tuple[1] is None:
321 return pa.plasma.ObjectNotAvailable
322 else:
323 return pa.deserialize(data_tuple[1])
325 for value in [b"Bytes Test", temp_id.binary(), 10 * b"\x00", 123]:
326 if isinstance(value, bytes):
327 object_id = self.plasma_client.put_raw_buffer(
328 value, metadata=use_meta)
329 else:
330 object_id = self.plasma_client.put(value)
331 [result] = self.plasma_client.get_buffers([object_id],
332 with_meta=True)
333 result = deserialize_or_output(result)
334 assert result == value
336 object_id = random_object_id()
337 [result] = self.plasma_client.get_buffers([object_id],
338 timeout_ms=0,
339 with_meta=True)
340 result = deserialize_or_output(result)
341 assert result == pa.plasma.ObjectNotAvailable
343 @pytest.mark.filterwarnings(
344 "ignore:'serialization_context':FutureWarning")
345 def test_put_and_get_serialization_context(self):
347 class CustomType:
348 def __init__(self, val):
349 self.val = val
351 val = CustomType(42)
353 with pytest.raises(pa.ArrowSerializationError):
354 self.plasma_client.put(val)
356 serialization_context = pa.lib.SerializationContext()
357 serialization_context.register_type(CustomType, 20*"\x00")
359 object_id = self.plasma_client.put(
360 val, None, serialization_context=serialization_context)
362 with pytest.raises(pa.ArrowSerializationError):
363 result = self.plasma_client.get(object_id)
365 result = self.plasma_client.get(
366 object_id, -1, serialization_context=serialization_context)
367 assert result.val == val.val
369 def test_store_arrow_objects(self):
370 data = np.random.randn(10, 4)
371 # Write an arrow object.
372 object_id = random_object_id()
373 tensor = pa.Tensor.from_numpy(data)
374 data_size = pa.ipc.get_tensor_size(tensor)
375 buf = self.plasma_client.create(object_id, data_size)
376 stream = pa.FixedSizeBufferWriter(buf)
377 pa.ipc.write_tensor(tensor, stream)
378 self.plasma_client.seal(object_id)
379 # Read the arrow object.
380 [tensor] = self.plasma_client.get_buffers([object_id])
381 reader = pa.BufferReader(tensor)
382 array = pa.ipc.read_tensor(reader).to_numpy()
383 # Assert that they are equal.
384 np.testing.assert_equal(data, array)
386 @pytest.mark.pandas
387 def test_store_pandas_dataframe(self):
388 import pandas as pd
389 import pyarrow.plasma as plasma
390 d = {'one': pd.Series([1., 2., 3.], index=['a', 'b', 'c']),
391 'two': pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])}
392 df = pd.DataFrame(d)
394 # Write the DataFrame.
395 record_batch = pa.RecordBatch.from_pandas(df)
396 # Determine the size.
397 s = pa.MockOutputStream()
398 stream_writer = pa.RecordBatchStreamWriter(s, record_batch.schema)
399 stream_writer.write_batch(record_batch)
400 data_size = s.size()
401 object_id = plasma.ObjectID(np.random.bytes(20))
403 buf = self.plasma_client.create(object_id, data_size)
404 stream = pa.FixedSizeBufferWriter(buf)
405 stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
406 stream_writer.write_batch(record_batch)
408 self.plasma_client.seal(object_id)
410 # Read the DataFrame.
411 [data] = self.plasma_client.get_buffers([object_id])
412 reader = pa.RecordBatchStreamReader(pa.BufferReader(data))
413 result = reader.read_next_batch().to_pandas()
415 pd.testing.assert_frame_equal(df, result)
417 def test_pickle_object_ids(self):
418 # This can be used for sharing object IDs between processes.
419 import pickle
420 object_id = random_object_id()
421 data = pickle.dumps(object_id)
422 object_id2 = pickle.loads(data)
423 assert object_id == object_id2
425 def test_store_full(self):
426 # The store is started with 1GB, so make sure that create throws an
427 # exception when it is full.
428 def assert_create_raises_plasma_full(unit_test, size):
429 partial_size = np.random.randint(size)
430 try:
431 _, memory_buffer, _ = create_object(unit_test.plasma_client,
432 partial_size,
433 size - partial_size)
434 # TODO(pcm): More specific error here.
435 except pa.lib.ArrowException:
436 pass
437 else:
438 # For some reason the above didn't throw an exception, so fail.
439 assert False
443 # Create a list to keep some of the buffers in scope.
444 memory_buffers = []
445 _, memory_buffer, _ = create_object(self.plasma_client, 50 * PERCENT)
446 memory_buffers.append(memory_buffer)
447 # Remaining space is 50%. Make sure that we can't create an
448 # object of size 50% + 1, but we can create one of size 20%.
449 assert_create_raises_plasma_full(
451 _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
452 del memory_buffer
453 _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
454 del memory_buffer
455 assert_create_raises_plasma_full(
458 _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
459 memory_buffers.append(memory_buffer)
460 # Remaining space is 30%.
461 assert_create_raises_plasma_full(
464 _, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT)
465 memory_buffers.append(memory_buffer)
466 # Remaining space is 20%.
467 assert_create_raises_plasma_full(
470 def test_contains(self):
471 fake_object_ids = [random_object_id() for _ in range(100)]
472 real_object_ids = [random_object_id() for _ in range(100)]
473 for object_id in real_object_ids:
474 assert self.plasma_client.contains(object_id) is False
475 self.plasma_client.create(object_id, 100)
476 self.plasma_client.seal(object_id)
477 assert self.plasma_client.contains(object_id)
478 for object_id in fake_object_ids:
479 assert not self.plasma_client.contains(object_id)
480 for object_id in real_object_ids:
481 assert self.plasma_client.contains(object_id)
483 def test_hash(self):
484 # Check the hash of an object that doesn't exist.
485 object_id1 = random_object_id()
486 try:
487 self.plasma_client.hash(object_id1)
488 # TODO(pcm): Introduce a more specific error type here
489 except pa.lib.ArrowException:
490 pass
491 else:
492 assert False
494 length = 1000
495 # Create a random object, and check that the hash function always
496 # returns the same value.
497 metadata = generate_metadata(length)
498 memory_buffer = np.frombuffer(self.plasma_client.create(object_id1,
499 length,
500 metadata),
501 dtype="uint8")
502 for i in range(length):
503 memory_buffer[i] = i % 256
504 self.plasma_client.seal(object_id1)
505 assert (self.plasma_client.hash(object_id1) ==
506 self.plasma_client.hash(object_id1))
508 # Create a second object with the same value as the first, and check
509 # that their hashes are equal.
510 object_id2 = random_object_id()
511 memory_buffer = np.frombuffer(self.plasma_client.create(object_id2,
512 length,
513 metadata),
514 dtype="uint8")
515 for i in range(length):
516 memory_buffer[i] = i % 256
517 self.plasma_client.seal(object_id2)
518 assert (self.plasma_client.hash(object_id1) ==
519 self.plasma_client.hash(object_id2))
521 # Create a third object with a different value from the first two, and
522 # check that its hash is different.
523 object_id3 = random_object_id()
524 metadata = generate_metadata(length)
525 memory_buffer = np.frombuffer(self.plasma_client.create(object_id3,
526 length,
527 metadata),
528 dtype="uint8")
529 for i in range(length):
530 memory_buffer[i] = (i + 1) % 256
531 self.plasma_client.seal(object_id3)
532 assert (self.plasma_client.hash(object_id1) !=
533 self.plasma_client.hash(object_id3))
535 # Create a fourth object with the same value as the third, but
536 # different metadata. Check that its hash is different from any of the
537 # previous three.
538 object_id4 = random_object_id()
539 metadata4 = generate_metadata(length)
540 memory_buffer = np.frombuffer(self.plasma_client.create(object_id4,
541 length,
542 metadata4),
543 dtype="uint8")
544 for i in range(length):
545 memory_buffer[i] = (i + 1) % 256
546 self.plasma_client.seal(object_id4)
547 assert (self.plasma_client.hash(object_id1) !=
548 self.plasma_client.hash(object_id4))
549 assert (self.plasma_client.hash(object_id3) !=
550 self.plasma_client.hash(object_id4))
552 def test_many_hashes(self):
553 hashes = []
554 length = 2 ** 10
556 for i in range(256):
557 object_id = random_object_id()
558 memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
559 length),
560 dtype="uint8")
561 for j in range(length):
562 memory_buffer[j] = i
563 self.plasma_client.seal(object_id)
564 hashes.append(self.plasma_client.hash(object_id))
566 # Create objects of varying length. Each pair has two bits different.
567 for i in range(length):
568 object_id = random_object_id()
569 memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
570 length),
571 dtype="uint8")
572 for j in range(length):
573 memory_buffer[j] = 0
574 memory_buffer[i] = 1
575 self.plasma_client.seal(object_id)
576 hashes.append(self.plasma_client.hash(object_id))
578 # Create objects of varying length, all with value 0.
579 for i in range(length):
580 object_id = random_object_id()
581 memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
582 i),
583 dtype="uint8")
584 for j in range(i):
585 memory_buffer[j] = 0
586 self.plasma_client.seal(object_id)
587 hashes.append(self.plasma_client.hash(object_id))
589 # Check that all hashes were unique.
590 assert len(set(hashes)) == 256 + length + length
592 # def test_individual_delete(self):
593 # length = 100
594 # # Create an object id string.
595 # object_id = random_object_id()
596 # # Create a random metadata string.
597 # metadata = generate_metadata(100)
598 # # Create a new buffer and write to it.
599 # memory_buffer = self.plasma_client.create(object_id, length,
600 # metadata)
601 # for i in range(length):
602 # memory_buffer[i] = chr(i % 256)
603 # # Seal the object.
604 # self.plasma_client.seal(object_id)
605 # # Check that the object is present.
606 # assert self.plasma_client.contains(object_id)
607 # # Delete the object.
608 # self.plasma_client.delete(object_id)
609 # # Make sure the object is no longer present.
610 # self.assertFalse(self.plasma_client.contains(object_id))
611 #
612 # def test_delete(self):
613 # # Create some objects.
614 # object_ids = [random_object_id() for _ in range(100)]
615 # for object_id in object_ids:
616 # length = 100
617 # # Create a random metadata string.
618 # metadata = generate_metadata(100)
619 # # Create a new buffer and write to it.
620 # memory_buffer = self.plasma_client.create(object_id, length,
621 # metadata)
622 # for i in range(length):
623 # memory_buffer[i] = chr(i % 256)
624 # # Seal the object.
625 # self.plasma_client.seal(object_id)
626 # # Check that the object is present.
627 # assert self.plasma_client.contains(object_id)
628 #
629 # # Delete the objects and make sure they are no longer present.
630 # for object_id in object_ids:
631 # # Delete the object.
632 # self.plasma_client.delete(object_id)
633 # # Make sure the object is no longer present.
634 # self.assertFalse(self.plasma_client.contains(object_id))
636 def test_illegal_functionality(self):
637 # Create an object id string.
638 object_id = random_object_id()
639 # Create a new buffer and write to it.
640 length = 1000
641 memory_buffer = self.plasma_client.create(object_id, length)
642 # Make sure we cannot access memory out of bounds.
643 with pytest.raises(Exception):
644 memory_buffer[length]
645 # Seal the object.
646 self.plasma_client.seal(object_id)
647 # This test is commented out because it currently fails.
648 # # Make sure the object is ready only now.
649 # def illegal_assignment():
650 # memory_buffer[0] = chr(0)
651 # with pytest.raises(Exception):
652 # illegal_assignment()
653 # Get the object.
654 memory_buffer = self.plasma_client.get_buffers([object_id])[0]
656 # Make sure the object is read only.
657 def illegal_assignment():
658 memory_buffer[0] = chr(0)
659 with pytest.raises(Exception):
660 illegal_assignment()
662 def test_evict(self):
663 client = self.plasma_client2
664 object_id1 = random_object_id()
665 b1 = client.create(object_id1, 1000)
666 client.seal(object_id1)
667 del b1
668 assert client.evict(1) == 1000
670 object_id2 = random_object_id()
671 object_id3 = random_object_id()
672 b2 = client.create(object_id2, 999)
673 b3 = client.create(object_id3, 998)
674 client.seal(object_id3)
675 del b3
676 assert client.evict(1000) == 998
678 object_id4 = random_object_id()
679 b4 = client.create(object_id4, 997)
680 client.seal(object_id4)
681 del b4
682 client.seal(object_id2)
683 del b2
684 assert client.evict(1) == 997
685 assert client.evict(1) == 999
687 object_id5 = random_object_id()
688 object_id6 = random_object_id()
689 object_id7 = random_object_id()
690 b5 = client.create(object_id5, 996)
691 b6 = client.create(object_id6, 995)
692 b7 = client.create(object_id7, 994)
693 client.seal(object_id5)
694 client.seal(object_id6)
695 client.seal(object_id7)
696 del b5
697 del b6
698 del b7
699 assert client.evict(2000) == 996 + 995 + 994
701 # Mitigate valgrind-induced slowness
702 SUBSCRIBE_TEST_SIZES = ([1, 10, 100, 1000] if USE_VALGRIND
703 else [1, 10, 100, 1000, 10000])
705 def test_subscribe(self):
706 # Subscribe to notifications from the Plasma Store.
707 self.plasma_client.subscribe()
708 for i in self.SUBSCRIBE_TEST_SIZES:
709 object_ids = [random_object_id() for _ in range(i)]
710 metadata_sizes = [np.random.randint(1000) for _ in range(i)]
711 data_sizes = [np.random.randint(1000) for _ in range(i)]
712 for j in range(i):
713 self.plasma_client.create(
714 object_ids[j], data_sizes[j],
715 metadata=bytearray(np.random.bytes(metadata_sizes[j])))
716 self.plasma_client.seal(object_ids[j])
717 # Check that we received notifications for all of the objects.
718 for j in range(i):
719 notification_info = self.plasma_client.get_next_notification()
720 recv_objid, recv_dsize, recv_msize = notification_info
721 assert object_ids[j] == recv_objid
722 assert data_sizes[j] == recv_dsize
723 assert metadata_sizes[j] == recv_msize
725 def test_subscribe_socket(self):
726 # Subscribe to notifications from the Plasma Store.
727 self.plasma_client.subscribe()
728 rsock = self.plasma_client.get_notification_socket()
729 for i in self.SUBSCRIBE_TEST_SIZES:
730 # Get notification from socket.
731 object_ids = [random_object_id() for _ in range(i)]
732 metadata_sizes = [np.random.randint(1000) for _ in range(i)]
733 data_sizes = [np.random.randint(1000) for _ in range(i)]
735 for j in range(i):
736 self.plasma_client.create(
737 object_ids[j], data_sizes[j],
738 metadata=bytearray(np.random.bytes(metadata_sizes[j])))
739 self.plasma_client.seal(object_ids[j])
741 # Check that we received notifications for all of the objects.
742 for j in range(i):
743 # Assume the plasma store will not be full,
744 # so we always get the data size instead of -1.
745 msg_len, = struct.unpack('L', rsock.recv(8))
746 content = rsock.recv(msg_len)
747 recv_objids, recv_dsizes, recv_msizes = (
748 self.plasma_client.decode_notifications(content))
749 assert object_ids[j] == recv_objids[0]
750 assert data_sizes[j] == recv_dsizes[0]
751 assert metadata_sizes[j] == recv_msizes[0]
753 def test_subscribe_deletions(self):
754 # Subscribe to notifications from the Plasma Store. We use
755 # plasma_client2 to make sure that all used objects will get evicted
756 # properly.
757 self.plasma_client2.subscribe()
758 for i in self.SUBSCRIBE_TEST_SIZES:
759 object_ids = [random_object_id() for _ in range(i)]
760 # Add 1 to the sizes to make sure we have nonzero object sizes.
761 metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
762 data_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
763 for j in range(i):
764 x = self.plasma_client2.create(
765 object_ids[j], data_sizes[j],
766 metadata=bytearray(np.random.bytes(metadata_sizes[j])))
767 self.plasma_client2.seal(object_ids[j])
768 del x
769 # Check that we received notifications for creating all of the
770 # objects.
771 for j in range(i):
772 notification_info = self.plasma_client2.get_next_notification()
773 recv_objid, recv_dsize, recv_msize = notification_info
774 assert object_ids[j] == recv_objid
775 assert data_sizes[j] == recv_dsize
776 assert metadata_sizes[j] == recv_msize
778 # Check that we receive notifications for deleting all objects, as
779 # we evict them.
780 for j in range(i):
781 assert (self.plasma_client2.evict(1) ==
782 data_sizes[j] + metadata_sizes[j])
783 notification_info = self.plasma_client2.get_next_notification()
784 recv_objid, recv_dsize, recv_msize = notification_info
785 assert object_ids[j] == recv_objid
786 assert -1 == recv_dsize
787 assert -1 == recv_msize
789 # Test multiple deletion notifications. The first 9 object IDs have
790 # size 0, and the last has a nonzero size. When Plasma evicts 1 byte,
791 # it will evict all objects, so we should receive deletion
792 # notifications for each.
793 num_object_ids = 10
794 object_ids = [random_object_id() for _ in range(num_object_ids)]
795 metadata_sizes = [0] * (num_object_ids - 1)
796 data_sizes = [0] * (num_object_ids - 1)
797 metadata_sizes.append(np.random.randint(1000))
798 data_sizes.append(np.random.randint(1000))
799 for i in range(num_object_ids):
800 x = self.plasma_client2.create(
801 object_ids[i], data_sizes[i],
802 metadata=bytearray(np.random.bytes(metadata_sizes[i])))
803 self.plasma_client2.seal(object_ids[i])
804 del x
805 for i in range(num_object_ids):
806 notification_info = self.plasma_client2.get_next_notification()
807 recv_objid, recv_dsize, recv_msize = notification_info
808 assert object_ids[i] == recv_objid
809 assert data_sizes[i] == recv_dsize
810 assert metadata_sizes[i] == recv_msize
811 assert (self.plasma_client2.evict(1) ==
812 data_sizes[-1] + metadata_sizes[-1])
813 for i in range(num_object_ids):
814 notification_info = self.plasma_client2.get_next_notification()
815 recv_objid, recv_dsize, recv_msize = notification_info
816 assert object_ids[i] == recv_objid
817 assert -1 == recv_dsize
818 assert -1 == recv_msize
820 def test_use_full_memory(self):
821 # Fill the object store up with a large number of small objects and let
822 # them go out of scope.
823 for _ in range(100):
824 create_object(
825 self.plasma_client2,
826 np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0)
827 # Create large objects that require the full object store size, and
828 # verify that they fit.
829 for _ in range(2):
830 create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
831 # Verify that an object that is too large does not fit.
832 # Also verifies that the right error is thrown, and does not
833 # create the object ID prematurely.
834 object_id = random_object_id()
835 for i in range(3):
836 with pytest.raises(pa.plasma.PlasmaStoreFull):
837 self.plasma_client2.create(
840 @staticmethod
841 def _client_blocked_in_get(plasma_store_name, object_id):
842 import pyarrow.plasma as plasma
843 client = plasma.connect(plasma_store_name)
844 # Try to get an object ID that doesn't exist. This should block.
845 client.get([object_id])
847 def test_client_death_during_get(self):
848 object_id = random_object_id()
850 p = multiprocessing.Process(target=self._client_blocked_in_get,
851 args=(self.plasma_store_name, object_id))
852 p.start()
853 # Make sure the process is running.
854 time.sleep(0.2)
855 assert p.is_alive()
857 # Kill the client process.
858 p.terminate()
859 # Wait a little for the store to process the disconnect event.
860 time.sleep(0.1)
862 # Create the object.
863 self.plasma_client.put(1, object_id=object_id)
865 # Check that the store is still alive. This will raise an exception if
866 # the store is dead.
867 self.plasma_client.contains(random_object_id())
869 @staticmethod
870 def _client_get_multiple(plasma_store_name, object_ids):
871 import pyarrow.plasma as plasma
872 client = plasma.connect(plasma_store_name)
873 # Try to get an object ID that doesn't exist. This should block.
874 client.get(object_ids)
876 def test_client_getting_multiple_objects(self):
877 object_ids = [random_object_id() for _ in range(10)]
879 p = multiprocessing.Process(target=self._client_get_multiple,
880 args=(self.plasma_store_name, object_ids))
881 p.start()
882 # Make sure the process is running.
883 time.sleep(0.2)
884 assert p.is_alive()
886 # Create the objects one by one.
887 for object_id in object_ids:
888 self.plasma_client.put(1, object_id=object_id)
890 # Check that the store is still alive. This will raise an exception if
891 # the store is dead.
892 self.plasma_client.contains(random_object_id())
894 # Make sure that the blocked client finishes.
895 start_time = time.time()
896 while True:
897 if time.time() - start_time > 5:
898 raise Exception("Timing out while waiting for blocked client "
899 "to finish.")
900 if not p.is_alive():
901 break
905class TestEvictionToExternalStore:
907 def setup_method(self, test_method):
908 import pyarrow.plasma as plasma
909 # Start Plasma store.
910 self.plasma_store_ctx = plasma.start_plasma_store(
911 plasma_store_memory=1000 * 1024,
912 use_valgrind=USE_VALGRIND,
913 external_store=EXTERNAL_STORE)
914 self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
915 # Connect to Plasma.
916 self.plasma_client = plasma.connect(self.plasma_store_name)
918 def teardown_method(self, test_method):
919 try:
920 # Check that the Plasma store is still alive.
921 assert self.p.poll() is None
922 self.p.send_signal(signal.SIGTERM)
923 self.p.wait(timeout=5)
924 finally:
925 self.plasma_store_ctx.__exit__(None, None, None)
927 def test_eviction(self):
928 client = self.plasma_client
930 object_ids = [random_object_id() for _ in range(0, 20)]
931 data = b'x' * 100 * 1024
932 metadata = b''
934 for i in range(0, 20):
935 # Test for object non-existence.
936 assert not client.contains(object_ids[i])
938 # Create and seal the object.
939 client.create_and_seal(object_ids[i], data, metadata)
941 # Test that the client can get the object.
942 assert client.contains(object_ids[i])
944 for i in range(0, 20):
945 # Since we are accessing objects sequentially, every object we
946 # access would be a cache "miss" owing to LRU eviction.
947 # Try and access the object from the plasma store first, and then
948 # try external store on failure. This should succeed to fetch the
949 # object. However, it may evict the next few objects.
950 [result] = client.get_buffers([object_ids[i]])
951 assert result.to_pybytes() == data
953 # Make sure we still cannot fetch objects that do not exist
954 [result] = client.get_buffers([random_object_id()], timeout_ms=100)
955 assert result is None
959def test_object_id_size():
960 import pyarrow.plasma as plasma
961 with pytest.raises(ValueError):
962 plasma.ObjectID("hello")
963 plasma.ObjectID(20 * b"0")
967def test_object_id_equality_operators():
968 import pyarrow.plasma as plasma
970 oid1 = plasma.ObjectID(20 * b'0')
971 oid2 = plasma.ObjectID(20 * b'0')
972 oid3 = plasma.ObjectID(19 * b'0' + b'1')
974 assert oid1 == oid2
975 assert oid2 != oid3
976 assert oid1 != 'foo'
979@pytest.mark.xfail(reason="often fails on travis")
980@pytest.mark.skipif(not os.path.exists("/mnt/hugepages"),
981 reason="requires hugepage support")
982def test_use_huge_pages():
983 import pyarrow.plasma as plasma
984 with plasma.start_plasma_store(
985 plasma_store_memory=2*10**9,
986 plasma_directory="/mnt/hugepages",
987 use_hugepages=True) as (plasma_store_name, p):
988 plasma_client = plasma.connect(plasma_store_name)
989 create_object(plasma_client, 10**8)
992# This is checking to make sure plasma_clients cannot be destroyed
993# before all the PlasmaBuffers that have handles to them are
994# destroyed, see ARROW-2448.
996def test_plasma_client_sharing():
997 import pyarrow.plasma as plasma
999 with plasma.start_plasma_store(
1000 plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
1001 as (plasma_store_name, p):
1002 plasma_client = plasma.connect(plasma_store_name)
1003 object_id = plasma_client.put(np.zeros(3))
1004 buf = plasma_client.get(object_id)
1005 del plasma_client
1006 assert (buf == np.zeros(3)).all()
1007 del buf # This segfaulted pre ARROW-2448.
1011def test_plasma_list():
1012 import pyarrow.plasma as plasma
1014 with plasma.start_plasma_store(
1015 plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
1016 as (plasma_store_name, p):
1017 plasma_client = plasma.connect(plasma_store_name)
1019 # Test sizes
1020 u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False)
1021 l1 = plasma_client.list()
1022 assert l1[u]["data_size"] == 11
1023 assert l1[u]["metadata_size"] == 7
1025 # Test ref_count
1026 v = plasma_client.put(np.zeros(3))
1027 # Ref count has already been released
1028 # XXX flaky test, disabled (ARROW-3344)
1029 # l2 = plasma_client.list()
1030 # assert l2[v]["ref_count"] == 0
1031 a = plasma_client.get(v)
1032 l3 = plasma_client.list()
1033 assert l3[v]["ref_count"] == 1
1034 del a
1036 # Test state
1037 w, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False)
1038 l4 = plasma_client.list()
1039 assert l4[w]["state"] == "created"
1040 plasma_client.seal(w)
1041 l5 = plasma_client.list()
1042 assert l5[w]["state"] == "sealed"
1044 # Test timestamps
1045 slack = 1.5 # seconds
1046 t1 = time.time()
1047 x, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False)
1048 t2 = time.time()
1049 l6 = plasma_client.list()
1050 assert t1 - slack <= l6[x]["create_time"] <= t2 + slack
1051 time.sleep(2.0)
1052 t3 = time.time()
1053 plasma_client.seal(x)
1054 t4 = time.time()
1055 l7 = plasma_client.list()
1056 assert t3 - t2 - slack <= l7[x]["construct_duration"]
1057 assert l7[x]["construct_duration"] <= t4 - t1 + slack
1061def test_object_id_randomness():
1062 cmd = "from pyarrow import plasma; print(plasma.ObjectID.from_random())"
1063 first_object_id = subprocess.check_output([sys.executable, "-c", cmd])
1064 second_object_id = subprocess.check_output([sys.executable, "-c", cmd])
1065 assert first_object_id != second_object_id
1069def test_store_capacity():
1070 import pyarrow.plasma as plasma
1071 with plasma.start_plasma_store(plasma_store_memory=10000) as (name, p):
1072 plasma_client = plasma.connect(name)
1073 assert plasma_client.store_capacity() == 10000