]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/python/pyarrow/tests/test_plasma.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / tests / test_plasma.py
CommitLineData
1d09f67e
TL
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18
19import multiprocessing
20import os
21import pytest
22import random
23import signal
24import struct
25import subprocess
26import sys
27import time
28
29import numpy as np
30import pyarrow as pa
31
32
33DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8
34USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1"
35EXTERNAL_STORE = "hashtable://test"
36SMALL_OBJECT_SIZE = 9000
37
38
39def random_name():
40 return str(random.randint(0, 99999999))
41
42
43def random_object_id():
44 import pyarrow.plasma as plasma
45 return plasma.ObjectID(np.random.bytes(20))
46
47
48def generate_metadata(length):
49 metadata = bytearray(length)
50 if length > 0:
51 metadata[0] = random.randint(0, 255)
52 metadata[-1] = random.randint(0, 255)
53 for _ in range(100):
54 metadata[random.randint(0, length - 1)] = random.randint(0, 255)
55 return metadata
56
57
58def write_to_data_buffer(buff, length):
59 array = np.frombuffer(buff, dtype="uint8")
60 if length > 0:
61 array[0] = random.randint(0, 255)
62 array[-1] = random.randint(0, 255)
63 for _ in range(100):
64 array[random.randint(0, length - 1)] = random.randint(0, 255)
65
66
67def create_object_with_id(client, object_id, data_size, metadata_size,
68 seal=True):
69 metadata = generate_metadata(metadata_size)
70 memory_buffer = client.create(object_id, data_size, metadata)
71 write_to_data_buffer(memory_buffer, data_size)
72 if seal:
73 client.seal(object_id)
74 return memory_buffer, metadata
75
76
77def create_object(client, data_size, metadata_size=0, seal=True):
78 object_id = random_object_id()
79 memory_buffer, metadata = create_object_with_id(client, object_id,
80 data_size, metadata_size,
81 seal=seal)
82 return object_id, memory_buffer, metadata
83
84
85@pytest.mark.plasma
86class TestPlasmaClient:
87
88 def setup_method(self, test_method):
89 import pyarrow.plasma as plasma
90 # Start Plasma store.
91 self.plasma_store_ctx = plasma.start_plasma_store(
92 plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
93 use_valgrind=USE_VALGRIND)
94 self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
95 # Connect to Plasma.
96 self.plasma_client = plasma.connect(self.plasma_store_name)
97 self.plasma_client2 = plasma.connect(self.plasma_store_name)
98
99 def teardown_method(self, test_method):
100 try:
101 # Check that the Plasma store is still alive.
102 assert self.p.poll() is None
103 # Ensure Valgrind and/or coverage have a clean exit
104 # Valgrind misses SIGTERM if it is delivered before the
105 # event loop is ready; this race condition is mitigated
106 # but not solved by time.sleep().
107 if USE_VALGRIND:
108 time.sleep(1.0)
109 self.p.send_signal(signal.SIGTERM)
110 self.p.wait(timeout=5)
111 assert self.p.returncode == 0
112 finally:
113 self.plasma_store_ctx.__exit__(None, None, None)
114
115 def test_connection_failure_raises_exception(self):
116 import pyarrow.plasma as plasma
117 # ARROW-1264
118 with pytest.raises(IOError):
119 plasma.connect('unknown-store-name', num_retries=1)
120
121 def test_create(self):
122 # Create an object id string.
123 object_id = random_object_id()
124 # Create a new buffer and write to it.
125 length = 50
126 memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
127 length),
128 dtype="uint8")
129 for i in range(length):
130 memory_buffer[i] = i % 256
131 # Seal the object.
132 self.plasma_client.seal(object_id)
133 # Get the object.
134 memory_buffer = np.frombuffer(
135 self.plasma_client.get_buffers([object_id])[0], dtype="uint8")
136 for i in range(length):
137 assert memory_buffer[i] == i % 256
138
139 def test_create_with_metadata(self):
140 for length in range(0, 1000, 3):
141 # Create an object id string.
142 object_id = random_object_id()
143 # Create a random metadata string.
144 metadata = generate_metadata(length)
145 # Create a new buffer and write to it.
146 memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
147 length,
148 metadata),
149 dtype="uint8")
150 for i in range(length):
151 memory_buffer[i] = i % 256
152 # Seal the object.
153 self.plasma_client.seal(object_id)
154 # Get the object.
155 memory_buffer = np.frombuffer(
156 self.plasma_client.get_buffers([object_id])[0], dtype="uint8")
157 for i in range(length):
158 assert memory_buffer[i] == i % 256
159 # Get the metadata.
160 metadata_buffer = np.frombuffer(
161 self.plasma_client.get_metadata([object_id])[0], dtype="uint8")
162 assert len(metadata) == len(metadata_buffer)
163 for i in range(len(metadata)):
164 assert metadata[i] == metadata_buffer[i]
165
166 def test_create_existing(self):
167 # This test is partially used to test the code path in which we create
168 # an object with an ID that already exists
169 length = 100
170 for _ in range(1000):
171 object_id = random_object_id()
172 self.plasma_client.create(object_id, length,
173 generate_metadata(length))
174 try:
175 self.plasma_client.create(object_id, length,
176 generate_metadata(length))
177 # TODO(pcm): Introduce a more specific error type here.
178 except pa.lib.ArrowException:
179 pass
180 else:
181 assert False
182
183 def test_create_and_seal(self):
184
185 # Create a bunch of objects.
186 object_ids = []
187 for i in range(1000):
188 object_id = random_object_id()
189 object_ids.append(object_id)
190 self.plasma_client.create_and_seal(object_id, i * b'a', i * b'b')
191
192 for i in range(1000):
193 [data_tuple] = self.plasma_client.get_buffers([object_ids[i]],
194 with_meta=True)
195 assert data_tuple[1].to_pybytes() == i * b'a'
196 assert (self.plasma_client.get_metadata(
197 [object_ids[i]])[0].to_pybytes() ==
198 i * b'b')
199
200 # Make sure that creating the same object twice raises an exception.
201 object_id = random_object_id()
202 self.plasma_client.create_and_seal(object_id, b'a', b'b')
203 with pytest.raises(pa.plasma.PlasmaObjectExists):
204 self.plasma_client.create_and_seal(object_id, b'a', b'b')
205
206 # Make sure that these objects can be evicted.
207 big_object = DEFAULT_PLASMA_STORE_MEMORY // 10 * b'a'
208 object_ids = []
209 for _ in range(20):
210 object_id = random_object_id()
211 object_ids.append(object_id)
212 self.plasma_client.create_and_seal(random_object_id(), big_object,
213 big_object)
214 for i in range(10):
215 assert not self.plasma_client.contains(object_ids[i])
216
217 def test_get(self):
218 num_object_ids = 60
219 # Test timing out of get with various timeouts.
220 for timeout in [0, 10, 100, 1000]:
221 object_ids = [random_object_id() for _ in range(num_object_ids)]
222 results = self.plasma_client.get_buffers(object_ids,
223 timeout_ms=timeout)
224 assert results == num_object_ids * [None]
225
226 data_buffers = []
227 metadata_buffers = []
228 for i in range(num_object_ids):
229 if i % 2 == 0:
230 data_buffer, metadata_buffer = create_object_with_id(
231 self.plasma_client, object_ids[i], 2000, 2000)
232 data_buffers.append(data_buffer)
233 metadata_buffers.append(metadata_buffer)
234
235 # Test timing out from some but not all get calls with various
236 # timeouts.
237 for timeout in [0, 10, 100, 1000]:
238 data_results = self.plasma_client.get_buffers(object_ids,
239 timeout_ms=timeout)
240 # metadata_results = self.plasma_client.get_metadata(
241 # object_ids, timeout_ms=timeout)
242 for i in range(num_object_ids):
243 if i % 2 == 0:
244 array1 = np.frombuffer(data_buffers[i // 2], dtype="uint8")
245 array2 = np.frombuffer(data_results[i], dtype="uint8")
246 np.testing.assert_equal(array1, array2)
247 # TODO(rkn): We should compare the metadata as well. But
248 # currently the types are different (e.g., memoryview
249 # versus bytearray).
250 # assert plasma.buffers_equal(
251 # metadata_buffers[i // 2], metadata_results[i])
252 else:
253 assert results[i] is None
254
255 # Test trying to get an object that was created by the same client but
256 # not sealed.
257 object_id = random_object_id()
258 self.plasma_client.create(object_id, 10, b"metadata")
259 assert self.plasma_client.get_buffers(
260 [object_id], timeout_ms=0, with_meta=True)[0][1] is None
261 assert self.plasma_client.get_buffers(
262 [object_id], timeout_ms=1, with_meta=True)[0][1] is None
263 self.plasma_client.seal(object_id)
264 assert self.plasma_client.get_buffers(
265 [object_id], timeout_ms=0, with_meta=True)[0][1] is not None
266
267 def test_buffer_lifetime(self):
268 # ARROW-2195
269 arr = pa.array([1, 12, 23, 3, 34], pa.int32())
270 batch = pa.RecordBatch.from_arrays([arr], ['field1'])
271
272 # Serialize RecordBatch into Plasma store
273 sink = pa.MockOutputStream()
274 writer = pa.RecordBatchStreamWriter(sink, batch.schema)
275 writer.write_batch(batch)
276 writer.close()
277
278 object_id = random_object_id()
279 data_buffer = self.plasma_client.create(object_id, sink.size())
280 stream = pa.FixedSizeBufferWriter(data_buffer)
281 writer = pa.RecordBatchStreamWriter(stream, batch.schema)
282 writer.write_batch(batch)
283 writer.close()
284 self.plasma_client.seal(object_id)
285 del data_buffer
286
287 # Unserialize RecordBatch from Plasma store
288 [data_buffer] = self.plasma_client2.get_buffers([object_id])
289 reader = pa.RecordBatchStreamReader(data_buffer)
290 read_batch = reader.read_next_batch()
291 # Lose reference to returned buffer. The RecordBatch must still
292 # be backed by valid memory.
293 del data_buffer, reader
294
295 assert read_batch.equals(batch)
296
297 def test_put_and_get(self):
298 for value in [["hello", "world", 3, 1.0], None, "hello"]:
299 object_id = self.plasma_client.put(value)
300 [result] = self.plasma_client.get([object_id])
301 assert result == value
302
303 result = self.plasma_client.get(object_id)
304 assert result == value
305
306 object_id = random_object_id()
307 [result] = self.plasma_client.get([object_id], timeout_ms=0)
308 assert result == pa.plasma.ObjectNotAvailable
309
310 @pytest.mark.filterwarnings(
311 "ignore:'pyarrow.deserialize':FutureWarning")
312 def test_put_and_get_raw_buffer(self):
313 temp_id = random_object_id()
314 use_meta = b"RAW"
315
316 def deserialize_or_output(data_tuple):
317 if data_tuple[0] == use_meta:
318 return data_tuple[1].to_pybytes()
319 else:
320 if data_tuple[1] is None:
321 return pa.plasma.ObjectNotAvailable
322 else:
323 return pa.deserialize(data_tuple[1])
324
325 for value in [b"Bytes Test", temp_id.binary(), 10 * b"\x00", 123]:
326 if isinstance(value, bytes):
327 object_id = self.plasma_client.put_raw_buffer(
328 value, metadata=use_meta)
329 else:
330 object_id = self.plasma_client.put(value)
331 [result] = self.plasma_client.get_buffers([object_id],
332 with_meta=True)
333 result = deserialize_or_output(result)
334 assert result == value
335
336 object_id = random_object_id()
337 [result] = self.plasma_client.get_buffers([object_id],
338 timeout_ms=0,
339 with_meta=True)
340 result = deserialize_or_output(result)
341 assert result == pa.plasma.ObjectNotAvailable
342
343 @pytest.mark.filterwarnings(
344 "ignore:'serialization_context':FutureWarning")
345 def test_put_and_get_serialization_context(self):
346
347 class CustomType:
348 def __init__(self, val):
349 self.val = val
350
351 val = CustomType(42)
352
353 with pytest.raises(pa.ArrowSerializationError):
354 self.plasma_client.put(val)
355
356 serialization_context = pa.lib.SerializationContext()
357 serialization_context.register_type(CustomType, 20*"\x00")
358
359 object_id = self.plasma_client.put(
360 val, None, serialization_context=serialization_context)
361
362 with pytest.raises(pa.ArrowSerializationError):
363 result = self.plasma_client.get(object_id)
364
365 result = self.plasma_client.get(
366 object_id, -1, serialization_context=serialization_context)
367 assert result.val == val.val
368
369 def test_store_arrow_objects(self):
370 data = np.random.randn(10, 4)
371 # Write an arrow object.
372 object_id = random_object_id()
373 tensor = pa.Tensor.from_numpy(data)
374 data_size = pa.ipc.get_tensor_size(tensor)
375 buf = self.plasma_client.create(object_id, data_size)
376 stream = pa.FixedSizeBufferWriter(buf)
377 pa.ipc.write_tensor(tensor, stream)
378 self.plasma_client.seal(object_id)
379 # Read the arrow object.
380 [tensor] = self.plasma_client.get_buffers([object_id])
381 reader = pa.BufferReader(tensor)
382 array = pa.ipc.read_tensor(reader).to_numpy()
383 # Assert that they are equal.
384 np.testing.assert_equal(data, array)
385
386 @pytest.mark.pandas
387 def test_store_pandas_dataframe(self):
388 import pandas as pd
389 import pyarrow.plasma as plasma
390 d = {'one': pd.Series([1., 2., 3.], index=['a', 'b', 'c']),
391 'two': pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])}
392 df = pd.DataFrame(d)
393
394 # Write the DataFrame.
395 record_batch = pa.RecordBatch.from_pandas(df)
396 # Determine the size.
397 s = pa.MockOutputStream()
398 stream_writer = pa.RecordBatchStreamWriter(s, record_batch.schema)
399 stream_writer.write_batch(record_batch)
400 data_size = s.size()
401 object_id = plasma.ObjectID(np.random.bytes(20))
402
403 buf = self.plasma_client.create(object_id, data_size)
404 stream = pa.FixedSizeBufferWriter(buf)
405 stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
406 stream_writer.write_batch(record_batch)
407
408 self.plasma_client.seal(object_id)
409
410 # Read the DataFrame.
411 [data] = self.plasma_client.get_buffers([object_id])
412 reader = pa.RecordBatchStreamReader(pa.BufferReader(data))
413 result = reader.read_next_batch().to_pandas()
414
415 pd.testing.assert_frame_equal(df, result)
416
417 def test_pickle_object_ids(self):
418 # This can be used for sharing object IDs between processes.
419 import pickle
420 object_id = random_object_id()
421 data = pickle.dumps(object_id)
422 object_id2 = pickle.loads(data)
423 assert object_id == object_id2
424
425 def test_store_full(self):
426 # The store is started with 1GB, so make sure that create throws an
427 # exception when it is full.
428 def assert_create_raises_plasma_full(unit_test, size):
429 partial_size = np.random.randint(size)
430 try:
431 _, memory_buffer, _ = create_object(unit_test.plasma_client,
432 partial_size,
433 size - partial_size)
434 # TODO(pcm): More specific error here.
435 except pa.lib.ArrowException:
436 pass
437 else:
438 # For some reason the above didn't throw an exception, so fail.
439 assert False
440
441 PERCENT = DEFAULT_PLASMA_STORE_MEMORY // 100
442
443 # Create a list to keep some of the buffers in scope.
444 memory_buffers = []
445 _, memory_buffer, _ = create_object(self.plasma_client, 50 * PERCENT)
446 memory_buffers.append(memory_buffer)
447 # Remaining space is 50%. Make sure that we can't create an
448 # object of size 50% + 1, but we can create one of size 20%.
449 assert_create_raises_plasma_full(
450 self, 50 * PERCENT + SMALL_OBJECT_SIZE)
451 _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
452 del memory_buffer
453 _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
454 del memory_buffer
455 assert_create_raises_plasma_full(
456 self, 50 * PERCENT + SMALL_OBJECT_SIZE)
457
458 _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
459 memory_buffers.append(memory_buffer)
460 # Remaining space is 30%.
461 assert_create_raises_plasma_full(
462 self, 30 * PERCENT + SMALL_OBJECT_SIZE)
463
464 _, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT)
465 memory_buffers.append(memory_buffer)
466 # Remaining space is 20%.
467 assert_create_raises_plasma_full(
468 self, 20 * PERCENT + SMALL_OBJECT_SIZE)
469
470 def test_contains(self):
471 fake_object_ids = [random_object_id() for _ in range(100)]
472 real_object_ids = [random_object_id() for _ in range(100)]
473 for object_id in real_object_ids:
474 assert self.plasma_client.contains(object_id) is False
475 self.plasma_client.create(object_id, 100)
476 self.plasma_client.seal(object_id)
477 assert self.plasma_client.contains(object_id)
478 for object_id in fake_object_ids:
479 assert not self.plasma_client.contains(object_id)
480 for object_id in real_object_ids:
481 assert self.plasma_client.contains(object_id)
482
483 def test_hash(self):
484 # Check the hash of an object that doesn't exist.
485 object_id1 = random_object_id()
486 try:
487 self.plasma_client.hash(object_id1)
488 # TODO(pcm): Introduce a more specific error type here
489 except pa.lib.ArrowException:
490 pass
491 else:
492 assert False
493
494 length = 1000
495 # Create a random object, and check that the hash function always
496 # returns the same value.
497 metadata = generate_metadata(length)
498 memory_buffer = np.frombuffer(self.plasma_client.create(object_id1,
499 length,
500 metadata),
501 dtype="uint8")
502 for i in range(length):
503 memory_buffer[i] = i % 256
504 self.plasma_client.seal(object_id1)
505 assert (self.plasma_client.hash(object_id1) ==
506 self.plasma_client.hash(object_id1))
507
508 # Create a second object with the same value as the first, and check
509 # that their hashes are equal.
510 object_id2 = random_object_id()
511 memory_buffer = np.frombuffer(self.plasma_client.create(object_id2,
512 length,
513 metadata),
514 dtype="uint8")
515 for i in range(length):
516 memory_buffer[i] = i % 256
517 self.plasma_client.seal(object_id2)
518 assert (self.plasma_client.hash(object_id1) ==
519 self.plasma_client.hash(object_id2))
520
521 # Create a third object with a different value from the first two, and
522 # check that its hash is different.
523 object_id3 = random_object_id()
524 metadata = generate_metadata(length)
525 memory_buffer = np.frombuffer(self.plasma_client.create(object_id3,
526 length,
527 metadata),
528 dtype="uint8")
529 for i in range(length):
530 memory_buffer[i] = (i + 1) % 256
531 self.plasma_client.seal(object_id3)
532 assert (self.plasma_client.hash(object_id1) !=
533 self.plasma_client.hash(object_id3))
534
535 # Create a fourth object with the same value as the third, but
536 # different metadata. Check that its hash is different from any of the
537 # previous three.
538 object_id4 = random_object_id()
539 metadata4 = generate_metadata(length)
540 memory_buffer = np.frombuffer(self.plasma_client.create(object_id4,
541 length,
542 metadata4),
543 dtype="uint8")
544 for i in range(length):
545 memory_buffer[i] = (i + 1) % 256
546 self.plasma_client.seal(object_id4)
547 assert (self.plasma_client.hash(object_id1) !=
548 self.plasma_client.hash(object_id4))
549 assert (self.plasma_client.hash(object_id3) !=
550 self.plasma_client.hash(object_id4))
551
552 def test_many_hashes(self):
553 hashes = []
554 length = 2 ** 10
555
556 for i in range(256):
557 object_id = random_object_id()
558 memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
559 length),
560 dtype="uint8")
561 for j in range(length):
562 memory_buffer[j] = i
563 self.plasma_client.seal(object_id)
564 hashes.append(self.plasma_client.hash(object_id))
565
566 # Create objects of varying length. Each pair has two bits different.
567 for i in range(length):
568 object_id = random_object_id()
569 memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
570 length),
571 dtype="uint8")
572 for j in range(length):
573 memory_buffer[j] = 0
574 memory_buffer[i] = 1
575 self.plasma_client.seal(object_id)
576 hashes.append(self.plasma_client.hash(object_id))
577
578 # Create objects of varying length, all with value 0.
579 for i in range(length):
580 object_id = random_object_id()
581 memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
582 i),
583 dtype="uint8")
584 for j in range(i):
585 memory_buffer[j] = 0
586 self.plasma_client.seal(object_id)
587 hashes.append(self.plasma_client.hash(object_id))
588
589 # Check that all hashes were unique.
590 assert len(set(hashes)) == 256 + length + length
591
592 # def test_individual_delete(self):
593 # length = 100
594 # # Create an object id string.
595 # object_id = random_object_id()
596 # # Create a random metadata string.
597 # metadata = generate_metadata(100)
598 # # Create a new buffer and write to it.
599 # memory_buffer = self.plasma_client.create(object_id, length,
600 # metadata)
601 # for i in range(length):
602 # memory_buffer[i] = chr(i % 256)
603 # # Seal the object.
604 # self.plasma_client.seal(object_id)
605 # # Check that the object is present.
606 # assert self.plasma_client.contains(object_id)
607 # # Delete the object.
608 # self.plasma_client.delete(object_id)
609 # # Make sure the object is no longer present.
610 # self.assertFalse(self.plasma_client.contains(object_id))
611 #
612 # def test_delete(self):
613 # # Create some objects.
614 # object_ids = [random_object_id() for _ in range(100)]
615 # for object_id in object_ids:
616 # length = 100
617 # # Create a random metadata string.
618 # metadata = generate_metadata(100)
619 # # Create a new buffer and write to it.
620 # memory_buffer = self.plasma_client.create(object_id, length,
621 # metadata)
622 # for i in range(length):
623 # memory_buffer[i] = chr(i % 256)
624 # # Seal the object.
625 # self.plasma_client.seal(object_id)
626 # # Check that the object is present.
627 # assert self.plasma_client.contains(object_id)
628 #
629 # # Delete the objects and make sure they are no longer present.
630 # for object_id in object_ids:
631 # # Delete the object.
632 # self.plasma_client.delete(object_id)
633 # # Make sure the object is no longer present.
634 # self.assertFalse(self.plasma_client.contains(object_id))
635
636 def test_illegal_functionality(self):
637 # Create an object id string.
638 object_id = random_object_id()
639 # Create a new buffer and write to it.
640 length = 1000
641 memory_buffer = self.plasma_client.create(object_id, length)
642 # Make sure we cannot access memory out of bounds.
643 with pytest.raises(Exception):
644 memory_buffer[length]
645 # Seal the object.
646 self.plasma_client.seal(object_id)
647 # This test is commented out because it currently fails.
648 # # Make sure the object is ready only now.
649 # def illegal_assignment():
650 # memory_buffer[0] = chr(0)
651 # with pytest.raises(Exception):
652 # illegal_assignment()
653 # Get the object.
654 memory_buffer = self.plasma_client.get_buffers([object_id])[0]
655
656 # Make sure the object is read only.
657 def illegal_assignment():
658 memory_buffer[0] = chr(0)
659 with pytest.raises(Exception):
660 illegal_assignment()
661
662 def test_evict(self):
663 client = self.plasma_client2
664 object_id1 = random_object_id()
665 b1 = client.create(object_id1, 1000)
666 client.seal(object_id1)
667 del b1
668 assert client.evict(1) == 1000
669
670 object_id2 = random_object_id()
671 object_id3 = random_object_id()
672 b2 = client.create(object_id2, 999)
673 b3 = client.create(object_id3, 998)
674 client.seal(object_id3)
675 del b3
676 assert client.evict(1000) == 998
677
678 object_id4 = random_object_id()
679 b4 = client.create(object_id4, 997)
680 client.seal(object_id4)
681 del b4
682 client.seal(object_id2)
683 del b2
684 assert client.evict(1) == 997
685 assert client.evict(1) == 999
686
687 object_id5 = random_object_id()
688 object_id6 = random_object_id()
689 object_id7 = random_object_id()
690 b5 = client.create(object_id5, 996)
691 b6 = client.create(object_id6, 995)
692 b7 = client.create(object_id7, 994)
693 client.seal(object_id5)
694 client.seal(object_id6)
695 client.seal(object_id7)
696 del b5
697 del b6
698 del b7
699 assert client.evict(2000) == 996 + 995 + 994
700
701 # Mitigate valgrind-induced slowness
702 SUBSCRIBE_TEST_SIZES = ([1, 10, 100, 1000] if USE_VALGRIND
703 else [1, 10, 100, 1000, 10000])
704
705 def test_subscribe(self):
706 # Subscribe to notifications from the Plasma Store.
707 self.plasma_client.subscribe()
708 for i in self.SUBSCRIBE_TEST_SIZES:
709 object_ids = [random_object_id() for _ in range(i)]
710 metadata_sizes = [np.random.randint(1000) for _ in range(i)]
711 data_sizes = [np.random.randint(1000) for _ in range(i)]
712 for j in range(i):
713 self.plasma_client.create(
714 object_ids[j], data_sizes[j],
715 metadata=bytearray(np.random.bytes(metadata_sizes[j])))
716 self.plasma_client.seal(object_ids[j])
717 # Check that we received notifications for all of the objects.
718 for j in range(i):
719 notification_info = self.plasma_client.get_next_notification()
720 recv_objid, recv_dsize, recv_msize = notification_info
721 assert object_ids[j] == recv_objid
722 assert data_sizes[j] == recv_dsize
723 assert metadata_sizes[j] == recv_msize
724
725 def test_subscribe_socket(self):
726 # Subscribe to notifications from the Plasma Store.
727 self.plasma_client.subscribe()
728 rsock = self.plasma_client.get_notification_socket()
729 for i in self.SUBSCRIBE_TEST_SIZES:
730 # Get notification from socket.
731 object_ids = [random_object_id() for _ in range(i)]
732 metadata_sizes = [np.random.randint(1000) for _ in range(i)]
733 data_sizes = [np.random.randint(1000) for _ in range(i)]
734
735 for j in range(i):
736 self.plasma_client.create(
737 object_ids[j], data_sizes[j],
738 metadata=bytearray(np.random.bytes(metadata_sizes[j])))
739 self.plasma_client.seal(object_ids[j])
740
741 # Check that we received notifications for all of the objects.
742 for j in range(i):
743 # Assume the plasma store will not be full,
744 # so we always get the data size instead of -1.
745 msg_len, = struct.unpack('L', rsock.recv(8))
746 content = rsock.recv(msg_len)
747 recv_objids, recv_dsizes, recv_msizes = (
748 self.plasma_client.decode_notifications(content))
749 assert object_ids[j] == recv_objids[0]
750 assert data_sizes[j] == recv_dsizes[0]
751 assert metadata_sizes[j] == recv_msizes[0]
752
753 def test_subscribe_deletions(self):
754 # Subscribe to notifications from the Plasma Store. We use
755 # plasma_client2 to make sure that all used objects will get evicted
756 # properly.
757 self.plasma_client2.subscribe()
758 for i in self.SUBSCRIBE_TEST_SIZES:
759 object_ids = [random_object_id() for _ in range(i)]
760 # Add 1 to the sizes to make sure we have nonzero object sizes.
761 metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
762 data_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
763 for j in range(i):
764 x = self.plasma_client2.create(
765 object_ids[j], data_sizes[j],
766 metadata=bytearray(np.random.bytes(metadata_sizes[j])))
767 self.plasma_client2.seal(object_ids[j])
768 del x
769 # Check that we received notifications for creating all of the
770 # objects.
771 for j in range(i):
772 notification_info = self.plasma_client2.get_next_notification()
773 recv_objid, recv_dsize, recv_msize = notification_info
774 assert object_ids[j] == recv_objid
775 assert data_sizes[j] == recv_dsize
776 assert metadata_sizes[j] == recv_msize
777
778 # Check that we receive notifications for deleting all objects, as
779 # we evict them.
780 for j in range(i):
781 assert (self.plasma_client2.evict(1) ==
782 data_sizes[j] + metadata_sizes[j])
783 notification_info = self.plasma_client2.get_next_notification()
784 recv_objid, recv_dsize, recv_msize = notification_info
785 assert object_ids[j] == recv_objid
786 assert -1 == recv_dsize
787 assert -1 == recv_msize
788
789 # Test multiple deletion notifications. The first 9 object IDs have
790 # size 0, and the last has a nonzero size. When Plasma evicts 1 byte,
791 # it will evict all objects, so we should receive deletion
792 # notifications for each.
793 num_object_ids = 10
794 object_ids = [random_object_id() for _ in range(num_object_ids)]
795 metadata_sizes = [0] * (num_object_ids - 1)
796 data_sizes = [0] * (num_object_ids - 1)
797 metadata_sizes.append(np.random.randint(1000))
798 data_sizes.append(np.random.randint(1000))
799 for i in range(num_object_ids):
800 x = self.plasma_client2.create(
801 object_ids[i], data_sizes[i],
802 metadata=bytearray(np.random.bytes(metadata_sizes[i])))
803 self.plasma_client2.seal(object_ids[i])
804 del x
805 for i in range(num_object_ids):
806 notification_info = self.plasma_client2.get_next_notification()
807 recv_objid, recv_dsize, recv_msize = notification_info
808 assert object_ids[i] == recv_objid
809 assert data_sizes[i] == recv_dsize
810 assert metadata_sizes[i] == recv_msize
811 assert (self.plasma_client2.evict(1) ==
812 data_sizes[-1] + metadata_sizes[-1])
813 for i in range(num_object_ids):
814 notification_info = self.plasma_client2.get_next_notification()
815 recv_objid, recv_dsize, recv_msize = notification_info
816 assert object_ids[i] == recv_objid
817 assert -1 == recv_dsize
818 assert -1 == recv_msize
819
820 def test_use_full_memory(self):
821 # Fill the object store up with a large number of small objects and let
822 # them go out of scope.
823 for _ in range(100):
824 create_object(
825 self.plasma_client2,
826 np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0)
827 # Create large objects that require the full object store size, and
828 # verify that they fit.
829 for _ in range(2):
830 create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
831 # Verify that an object that is too large does not fit.
832 # Also verifies that the right error is thrown, and does not
833 # create the object ID prematurely.
834 object_id = random_object_id()
835 for i in range(3):
836 with pytest.raises(pa.plasma.PlasmaStoreFull):
837 self.plasma_client2.create(
838 object_id, DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE)
839
840 @staticmethod
841 def _client_blocked_in_get(plasma_store_name, object_id):
842 import pyarrow.plasma as plasma
843 client = plasma.connect(plasma_store_name)
844 # Try to get an object ID that doesn't exist. This should block.
845 client.get([object_id])
846
847 def test_client_death_during_get(self):
848 object_id = random_object_id()
849
850 p = multiprocessing.Process(target=self._client_blocked_in_get,
851 args=(self.plasma_store_name, object_id))
852 p.start()
853 # Make sure the process is running.
854 time.sleep(0.2)
855 assert p.is_alive()
856
857 # Kill the client process.
858 p.terminate()
859 # Wait a little for the store to process the disconnect event.
860 time.sleep(0.1)
861
862 # Create the object.
863 self.plasma_client.put(1, object_id=object_id)
864
865 # Check that the store is still alive. This will raise an exception if
866 # the store is dead.
867 self.plasma_client.contains(random_object_id())
868
869 @staticmethod
870 def _client_get_multiple(plasma_store_name, object_ids):
871 import pyarrow.plasma as plasma
872 client = plasma.connect(plasma_store_name)
873 # Try to get an object ID that doesn't exist. This should block.
874 client.get(object_ids)
875
876 def test_client_getting_multiple_objects(self):
877 object_ids = [random_object_id() for _ in range(10)]
878
879 p = multiprocessing.Process(target=self._client_get_multiple,
880 args=(self.plasma_store_name, object_ids))
881 p.start()
882 # Make sure the process is running.
883 time.sleep(0.2)
884 assert p.is_alive()
885
886 # Create the objects one by one.
887 for object_id in object_ids:
888 self.plasma_client.put(1, object_id=object_id)
889
890 # Check that the store is still alive. This will raise an exception if
891 # the store is dead.
892 self.plasma_client.contains(random_object_id())
893
894 # Make sure that the blocked client finishes.
895 start_time = time.time()
896 while True:
897 if time.time() - start_time > 5:
898 raise Exception("Timing out while waiting for blocked client "
899 "to finish.")
900 if not p.is_alive():
901 break
902
903
904@pytest.mark.plasma
905class TestEvictionToExternalStore:
906
907 def setup_method(self, test_method):
908 import pyarrow.plasma as plasma
909 # Start Plasma store.
910 self.plasma_store_ctx = plasma.start_plasma_store(
911 plasma_store_memory=1000 * 1024,
912 use_valgrind=USE_VALGRIND,
913 external_store=EXTERNAL_STORE)
914 self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
915 # Connect to Plasma.
916 self.plasma_client = plasma.connect(self.plasma_store_name)
917
918 def teardown_method(self, test_method):
919 try:
920 # Check that the Plasma store is still alive.
921 assert self.p.poll() is None
922 self.p.send_signal(signal.SIGTERM)
923 self.p.wait(timeout=5)
924 finally:
925 self.plasma_store_ctx.__exit__(None, None, None)
926
927 def test_eviction(self):
928 client = self.plasma_client
929
930 object_ids = [random_object_id() for _ in range(0, 20)]
931 data = b'x' * 100 * 1024
932 metadata = b''
933
934 for i in range(0, 20):
935 # Test for object non-existence.
936 assert not client.contains(object_ids[i])
937
938 # Create and seal the object.
939 client.create_and_seal(object_ids[i], data, metadata)
940
941 # Test that the client can get the object.
942 assert client.contains(object_ids[i])
943
944 for i in range(0, 20):
945 # Since we are accessing objects sequentially, every object we
946 # access would be a cache "miss" owing to LRU eviction.
947 # Try and access the object from the plasma store first, and then
948 # try external store on failure. This should succeed to fetch the
949 # object. However, it may evict the next few objects.
950 [result] = client.get_buffers([object_ids[i]])
951 assert result.to_pybytes() == data
952
953 # Make sure we still cannot fetch objects that do not exist
954 [result] = client.get_buffers([random_object_id()], timeout_ms=100)
955 assert result is None
956
957
958@pytest.mark.plasma
959def test_object_id_size():
960 import pyarrow.plasma as plasma
961 with pytest.raises(ValueError):
962 plasma.ObjectID("hello")
963 plasma.ObjectID(20 * b"0")
964
965
966@pytest.mark.plasma
967def test_object_id_equality_operators():
968 import pyarrow.plasma as plasma
969
970 oid1 = plasma.ObjectID(20 * b'0')
971 oid2 = plasma.ObjectID(20 * b'0')
972 oid3 = plasma.ObjectID(19 * b'0' + b'1')
973
974 assert oid1 == oid2
975 assert oid2 != oid3
976 assert oid1 != 'foo'
977
978
979@pytest.mark.xfail(reason="often fails on travis")
980@pytest.mark.skipif(not os.path.exists("/mnt/hugepages"),
981 reason="requires hugepage support")
982def test_use_huge_pages():
983 import pyarrow.plasma as plasma
984 with plasma.start_plasma_store(
985 plasma_store_memory=2*10**9,
986 plasma_directory="/mnt/hugepages",
987 use_hugepages=True) as (plasma_store_name, p):
988 plasma_client = plasma.connect(plasma_store_name)
989 create_object(plasma_client, 10**8)
990
991
992# This is checking to make sure plasma_clients cannot be destroyed
993# before all the PlasmaBuffers that have handles to them are
994# destroyed, see ARROW-2448.
995@pytest.mark.plasma
996def test_plasma_client_sharing():
997 import pyarrow.plasma as plasma
998
999 with plasma.start_plasma_store(
1000 plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
1001 as (plasma_store_name, p):
1002 plasma_client = plasma.connect(plasma_store_name)
1003 object_id = plasma_client.put(np.zeros(3))
1004 buf = plasma_client.get(object_id)
1005 del plasma_client
1006 assert (buf == np.zeros(3)).all()
1007 del buf # This segfaulted pre ARROW-2448.
1008
1009
1010@pytest.mark.plasma
1011def test_plasma_list():
1012 import pyarrow.plasma as plasma
1013
1014 with plasma.start_plasma_store(
1015 plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
1016 as (plasma_store_name, p):
1017 plasma_client = plasma.connect(plasma_store_name)
1018
1019 # Test sizes
1020 u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False)
1021 l1 = plasma_client.list()
1022 assert l1[u]["data_size"] == 11
1023 assert l1[u]["metadata_size"] == 7
1024
1025 # Test ref_count
1026 v = plasma_client.put(np.zeros(3))
1027 # Ref count has already been released
1028 # XXX flaky test, disabled (ARROW-3344)
1029 # l2 = plasma_client.list()
1030 # assert l2[v]["ref_count"] == 0
1031 a = plasma_client.get(v)
1032 l3 = plasma_client.list()
1033 assert l3[v]["ref_count"] == 1
1034 del a
1035
1036 # Test state
1037 w, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False)
1038 l4 = plasma_client.list()
1039 assert l4[w]["state"] == "created"
1040 plasma_client.seal(w)
1041 l5 = plasma_client.list()
1042 assert l5[w]["state"] == "sealed"
1043
1044 # Test timestamps
1045 slack = 1.5 # seconds
1046 t1 = time.time()
1047 x, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False)
1048 t2 = time.time()
1049 l6 = plasma_client.list()
1050 assert t1 - slack <= l6[x]["create_time"] <= t2 + slack
1051 time.sleep(2.0)
1052 t3 = time.time()
1053 plasma_client.seal(x)
1054 t4 = time.time()
1055 l7 = plasma_client.list()
1056 assert t3 - t2 - slack <= l7[x]["construct_duration"]
1057 assert l7[x]["construct_duration"] <= t4 - t1 + slack
1058
1059
1060@pytest.mark.plasma
1061def test_object_id_randomness():
1062 cmd = "from pyarrow import plasma; print(plasma.ObjectID.from_random())"
1063 first_object_id = subprocess.check_output([sys.executable, "-c", cmd])
1064 second_object_id = subprocess.check_output([sys.executable, "-c", cmd])
1065 assert first_object_id != second_object_id
1066
1067
1068@pytest.mark.plasma
1069def test_store_capacity():
1070 import pyarrow.plasma as plasma
1071 with plasma.start_plasma_store(plasma_store_memory=10000) as (name, p):
1072 plasma_client = plasma.connect(name)
1073 assert plasma_client.store_capacity() == 10000