]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/operations/volume.py
6 from contextlib
import contextmanager
7 from threading
import Lock
, Condition
8 from typing
import no_type_check
10 if sys
.version_info
>= (3, 3):
11 from threading
import Timer
13 from threading
import _Timer
as Timer
18 from .lock
import GlobalLock
19 from ..exception
import VolumeException
20 from ..fs_util
import create_pool
, remove_pool
, create_filesystem
, \
21 remove_filesystem
, create_mds
, volume_exists
23 log
= logging
.getLogger(__name__
)
25 class ConnectionPool(object):
26 class Connection(object):
27 def __init__(self
, mgr
, fs_name
):
30 self
.fs_name
= fs_name
31 self
.ops_in_progress
= 0
32 self
.last_used
= time
.time()
33 self
.fs_id
= self
.get_fs_id()
36 fs_map
= self
.mgr
.get('fs_map')
37 for fs
in fs_map
['filesystems']:
38 if fs
['mdsmap']['fs_name'] == self
.fs_name
:
40 raise VolumeException(
41 -errno
.ENOENT
, "Volume '{0}' not found".format(self
.fs_name
))
43 def get_fs_handle(self
):
44 self
.last_used
= time
.time()
45 self
.ops_in_progress
+= 1
48 def put_fs_handle(self
, notify
):
49 assert self
.ops_in_progress
> 0
50 self
.ops_in_progress
-= 1
51 if self
.ops_in_progress
== 0:
54 def del_fs_handle(self
, waiter
):
56 while self
.ops_in_progress
!= 0:
58 if self
.is_connection_valid():
63 def is_connection_valid(self
):
66 fs_id
= self
.get_fs_id()
68 # the filesystem does not exist now -- connection is not valid.
70 log
.debug("self.fs_id={0}, fs_id={1}".format(self
.fs_id
, fs_id
))
71 return self
.fs_id
== fs_id
73 def is_connection_idle(self
, timeout
):
74 return (self
.ops_in_progress
== 0 and ((time
.time() - self
.last_used
) >= timeout
))
77 assert self
.ops_in_progress
== 0
78 log
.debug("Connecting to cephfs '{0}'".format(self
.fs_name
))
79 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.mgr
.rados
)
80 log
.debug("Setting user ID and group ID of CephFS mount as root...")
81 self
.fs
.conf_set("client_mount_uid", "0")
82 self
.fs
.conf_set("client_mount_gid", "0")
83 log
.debug("CephFS initializing...")
85 log
.debug("CephFS mounting...")
86 self
.fs
.mount(filesystem_name
=self
.fs_name
.encode('utf-8'))
87 log
.debug("Connection to cephfs '{0}' complete".format(self
.fs_name
))
88 self
.mgr
._ceph
_register
_client
(self
.fs
.get_addrs())
93 assert self
.ops_in_progress
== 0
94 log
.info("disconnecting from cephfs '{0}'".format(self
.fs_name
))
95 addrs
= self
.fs
.get_addrs()
97 self
.mgr
._ceph
_unregister
_client
(addrs
)
99 except Exception as e
:
100 log
.debug("disconnect: ({0})".format(e
))
105 assert self
.ops_in_progress
== 0
106 log
.info("aborting connection from cephfs '{0}'".format(self
.fs_name
))
108 log
.info("abort done from cephfs '{0}'".format(self
.fs_name
))
113 recurring timer variant of Timer
118 while not self
.finished
.is_set():
119 self
.finished
.wait(self
.interval
)
120 self
.function(*self
.args
, **self
.kwargs
)
122 except Exception as e
:
123 log
.error("ConnectionPool.RTimer: %s", e
)
126 # TODO: make this configurable
127 TIMER_TASK_RUN_INTERVAL
= 30.0 # seconds
128 CONNECTION_IDLE_INTERVAL
= 60.0 # seconds
130 def __init__(self
, mgr
):
132 self
.connections
= {}
134 self
.cond
= Condition(self
.lock
)
135 self
.timer_task
= ConnectionPool
.RTimer(ConnectionPool
.TIMER_TASK_RUN_INTERVAL
,
136 self
.cleanup_connections
)
137 self
.timer_task
.start()
139 def cleanup_connections(self
):
141 log
.info("scanning for idle connections..")
142 idle_fs
= [fs_name
for fs_name
,conn
in self
.connections
.items()
143 if conn
.is_connection_idle(ConnectionPool
.CONNECTION_IDLE_INTERVAL
)]
144 for fs_name
in idle_fs
:
145 log
.info("cleaning up connection for '{}'".format(fs_name
))
146 self
._del
_fs
_handle
(fs_name
)
148 def get_fs_handle(self
, fs_name
):
152 conn
= self
.connections
.get(fs_name
, None)
154 if conn
.is_connection_valid():
155 return conn
.get_fs_handle()
157 # filesystem id changed beneath us (or the filesystem does not exist).
158 # this is possible if the filesystem got removed (and recreated with
159 # same name) via "ceph fs rm/new" mon command.
160 log
.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name
))
161 self
._del
_fs
_handle
(fs_name
)
162 conn
= ConnectionPool
.Connection(self
.mgr
, fs_name
)
164 except cephfs
.Error
as e
:
165 # try to provide a better error string if possible
166 if e
.args
[0] == errno
.ENOENT
:
167 raise VolumeException(
168 -errno
.ENOENT
, "Volume '{0}' not found".format(fs_name
))
169 raise VolumeException(-e
.args
[0], e
.args
[1])
170 self
.connections
[fs_name
] = conn
171 return conn
.get_fs_handle()
173 def put_fs_handle(self
, fs_name
):
175 conn
= self
.connections
.get(fs_name
, None)
177 conn
.put_fs_handle(notify
=lambda: self
.cond
.notifyAll())
179 def _del_fs_handle(self
, fs_name
, wait
=False):
180 conn
= self
.connections
.pop(fs_name
, None)
182 conn
.del_fs_handle(waiter
=None if not wait
else lambda: self
.cond
.wait())
184 def del_fs_handle(self
, fs_name
, wait
=False):
186 self
._del
_fs
_handle
(fs_name
, wait
)
188 def del_all_handles(self
):
190 for fs_name
in list(self
.connections
.keys()):
191 log
.info("waiting for pending ops for '{}'".format(fs_name
))
192 self
._del
_fs
_handle
(fs_name
, wait
=True)
193 log
.info("pending ops completed for '{}'".format(fs_name
))
194 # no new connections should have been initialized since its
195 # guarded on shutdown.
196 assert len(self
.connections
) == 0
198 def gen_pool_names(volname
):
200 return metadata and data pool name (from a filesystem/volume name) as a tuple
202 return "cephfs.{}.meta".format(volname
), "cephfs.{}.data".format(volname
)
204 def create_volume(mgr
, volname
, placement
):
206 create volume (pool, filesystem and mds)
208 metadata_pool
, data_pool
= gen_pool_names(volname
)
210 r
, outs
, outb
= create_pool(mgr
, metadata_pool
)
213 r
, outb
, outs
= create_pool(mgr
, data_pool
)
216 remove_pool(mgr
, metadata_pool
)
219 r
, outb
, outs
= create_filesystem(mgr
, volname
, metadata_pool
, data_pool
)
221 log
.error("Filesystem creation error: {0} {1} {2}".format(r
, outb
, outs
))
223 remove_pool(mgr
, data_pool
)
224 remove_pool(mgr
, metadata_pool
)
227 return create_mds(mgr
, volname
, placement
)
229 def delete_volume(mgr
, volname
):
231 delete the given module (tear down mds, remove filesystem)
233 # Tear down MDS daemons
235 completion
= mgr
.remove_service('mds.' + volname
)
236 mgr
._orchestrator
_wait
([completion
])
237 orchestrator
.raise_if_exception(completion
)
238 except (ImportError, orchestrator
.OrchestratorError
):
239 log
.warning("OrchestratorError, not tearing down MDS daemons")
240 except Exception as e
:
241 # Don't let detailed orchestrator exceptions (python backtraces)
242 # bubble out to the user
243 log
.exception("Failed to tear down MDS daemons")
244 return -errno
.EINVAL
, "", str(e
)
246 # In case orchestrator didn't tear down MDS daemons cleanly, or
247 # there was no orchestrator, we force the daemons down.
248 if volume_exists(mgr
, volname
):
249 r
, outb
, outs
= remove_filesystem(mgr
, volname
)
253 err
= "Filesystem not found for volume '{0}'".format(volname
)
255 return -errno
.ENOENT
, "", err
256 metadata_pool
, data_pool
= gen_pool_names(volname
)
257 r
, outb
, outs
= remove_pool(mgr
, metadata_pool
)
260 return remove_pool(mgr
, data_pool
)
262 def list_volumes(mgr
):
264 list all filesystem volumes.
270 fs_map
= mgr
.get("fs_map")
271 for f
in fs_map
['filesystems']:
272 result
.append({'name': f
['mdsmap']['fs_name']})
276 def open_volume(vc
, volname
):
278 open a volume for exclusive access. This API is to be used as a context manager.
280 :param vc: volume client instance
281 :param volname: volume name
282 :return: yields a volume handle (ceph filesystem handle)
285 raise VolumeException(-errno
.ESHUTDOWN
, "shutdown in progress")
287 g_lock
= GlobalLock()
288 fs_handle
= vc
.connection_pool
.get_fs_handle(volname
)
290 with g_lock
.lock_op():
293 vc
.connection_pool
.put_fs_handle(volname
)
296 def open_volume_lockless(vc
, volname
):
298 open a volume with shared access. This API is to be used as a context manager.
300 :param vc: volume client instance
301 :param volname: volume name
302 :return: yields a volume handle (ceph filesystem handle)
305 raise VolumeException(-errno
.ESHUTDOWN
, "shutdown in progress")
307 fs_handle
= vc
.connection_pool
.get_fs_handle(volname
)
311 vc
.connection_pool
.put_fs_handle(volname
)