]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/python/pyarrow/plasma.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / plasma.py
CommitLineData
1d09f67e
TL
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18
19import contextlib
20import os
21import pyarrow as pa
22import shutil
23import subprocess
24import sys
25import tempfile
26import time
27
28from pyarrow._plasma import (ObjectID, ObjectNotAvailable, # noqa
29 PlasmaBuffer, PlasmaClient, connect,
30 PlasmaObjectExists, PlasmaObjectNotFound,
31 PlasmaStoreFull)
32
33
34# The Plasma TensorFlow Operator needs to be compiled on the end user's
35# machine since the TensorFlow ABI is not stable between versions.
36# The following code checks if the operator is already present. If not,
37# the function build_plasma_tensorflow_op can be used to compile it.
38
39
40TF_PLASMA_OP_PATH = os.path.join(pa.__path__[0], "tensorflow", "plasma_op.so")
41
42
43tf_plasma_op = None
44
45
46def load_plasma_tensorflow_op():
47 global tf_plasma_op
48 import tensorflow as tf
49 tf_plasma_op = tf.load_op_library(TF_PLASMA_OP_PATH)
50
51
52def build_plasma_tensorflow_op():
53 global tf_plasma_op
54 try:
55 import tensorflow as tf
56 print("TensorFlow version: " + tf.__version__)
57 except ImportError:
58 pass
59 else:
60 print("Compiling Plasma TensorFlow Op...")
61 dir_path = os.path.dirname(os.path.realpath(__file__))
62 cc_path = os.path.join(dir_path, "tensorflow", "plasma_op.cc")
63 so_path = os.path.join(dir_path, "tensorflow", "plasma_op.so")
64 tf_cflags = tf.sysconfig.get_compile_flags()
65 if sys.platform == 'darwin':
66 tf_cflags = ["-undefined", "dynamic_lookup"] + tf_cflags
67 cmd = ["g++", "-std=c++11", "-g", "-shared", cc_path,
68 "-o", so_path, "-DNDEBUG", "-I" + pa.get_include()]
69 cmd += ["-L" + dir for dir in pa.get_library_dirs()]
70 cmd += ["-lplasma", "-larrow_python", "-larrow", "-fPIC"]
71 cmd += tf_cflags
72 cmd += tf.sysconfig.get_link_flags()
73 cmd += ["-O2"]
74 if tf.test.is_built_with_cuda():
75 cmd += ["-DGOOGLE_CUDA"]
76 print("Running command " + str(cmd))
77 subprocess.check_call(cmd)
78 tf_plasma_op = tf.load_op_library(TF_PLASMA_OP_PATH)
79
80
81@contextlib.contextmanager
82def start_plasma_store(plasma_store_memory,
83 use_valgrind=False, use_profiler=False,
84 plasma_directory=None, use_hugepages=False,
85 external_store=None):
86 """Start a plasma store process.
87 Args:
88 plasma_store_memory (int): Capacity of the plasma store in bytes.
89 use_valgrind (bool): True if the plasma store should be started inside
90 of valgrind. If this is True, use_profiler must be False.
91 use_profiler (bool): True if the plasma store should be started inside
92 a profiler. If this is True, use_valgrind must be False.
93 plasma_directory (str): Directory where plasma memory mapped files
94 will be stored.
95 use_hugepages (bool): True if the plasma store should use huge pages.
96 external_store (str): External store to use for evicted objects.
97 Return:
98 A tuple of the name of the plasma store socket and the process ID of
99 the plasma store process.
100 """
101 if use_valgrind and use_profiler:
102 raise Exception("Cannot use valgrind and profiler at the same time.")
103
104 tmpdir = tempfile.mkdtemp(prefix='test_plasma-')
105 try:
106 plasma_store_name = os.path.join(tmpdir, 'plasma.sock')
107 plasma_store_executable = os.path.join(
108 pa.__path__[0], "plasma-store-server")
109 if not os.path.exists(plasma_store_executable):
110 # Fallback to sys.prefix/bin/ (conda)
111 plasma_store_executable = os.path.join(
112 sys.prefix, "bin", "plasma-store-server")
113 command = [plasma_store_executable,
114 "-s", plasma_store_name,
115 "-m", str(plasma_store_memory)]
116 if plasma_directory:
117 command += ["-d", plasma_directory]
118 if use_hugepages:
119 command += ["-h"]
120 if external_store is not None:
121 command += ["-e", external_store]
122 stdout_file = None
123 stderr_file = None
124 if use_valgrind:
125 command = ["valgrind",
126 "--track-origins=yes",
127 "--leak-check=full",
128 "--show-leak-kinds=all",
129 "--leak-check-heuristics=stdstring",
130 "--error-exitcode=1"] + command
131 proc = subprocess.Popen(command, stdout=stdout_file,
132 stderr=stderr_file)
133 time.sleep(1.0)
134 elif use_profiler:
135 command = ["valgrind", "--tool=callgrind"] + command
136 proc = subprocess.Popen(command, stdout=stdout_file,
137 stderr=stderr_file)
138 time.sleep(1.0)
139 else:
140 proc = subprocess.Popen(command, stdout=stdout_file,
141 stderr=stderr_file)
142 time.sleep(0.1)
143 rc = proc.poll()
144 if rc is not None:
145 raise RuntimeError("plasma_store exited unexpectedly with "
146 "code %d" % (rc,))
147
148 yield plasma_store_name, proc
149 finally:
150 if proc.poll() is None:
151 proc.kill()
152 shutil.rmtree(tmpdir)