]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/operations/volume.py
4 from contextlib
import contextmanager
5 from threading
import Lock
, Condition
9 from threading
import _Timer
as Timer
12 from threading
import Timer
17 from .lock
import GlobalLock
18 from ..exception
import VolumeException
19 from ..fs_util
import create_pool
, remove_pool
, create_filesystem
, \
20 remove_filesystem
, create_mds
, volume_exists
22 log
= logging
.getLogger(__name__
)
24 class ConnectionPool(object):
25 class Connection(object):
26 def __init__(self
, mgr
, fs_name
):
29 self
.fs_name
= fs_name
30 self
.ops_in_progress
= 0
31 self
.last_used
= time
.time()
32 self
.fs_id
= self
.get_fs_id()
35 fs_map
= self
.mgr
.get('fs_map')
36 for fs
in fs_map
['filesystems']:
37 if fs
['mdsmap']['fs_name'] == self
.fs_name
:
39 raise VolumeException(
40 -errno
.ENOENT
, "Volume '{0}' not found".format(self
.fs_name
))
42 def get_fs_handle(self
):
43 self
.last_used
= time
.time()
44 self
.ops_in_progress
+= 1
47 def put_fs_handle(self
, notify
):
48 assert self
.ops_in_progress
> 0
49 self
.ops_in_progress
-= 1
50 if self
.ops_in_progress
== 0:
53 def del_fs_handle(self
, waiter
):
55 while self
.ops_in_progress
!= 0:
57 if self
.is_connection_valid():
62 def is_connection_valid(self
):
65 fs_id
= self
.get_fs_id()
67 # the filesystem does not exist now -- connection is not valid.
69 log
.debug("self.fs_id={0}, fs_id={1}".format(self
.fs_id
, fs_id
))
70 return self
.fs_id
== fs_id
72 def is_connection_idle(self
, timeout
):
73 return (self
.ops_in_progress
== 0 and ((time
.time() - self
.last_used
) >= timeout
))
76 assert self
.ops_in_progress
== 0
77 log
.debug("Connecting to cephfs '{0}'".format(self
.fs_name
))
78 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.mgr
.rados
)
79 log
.debug("Setting user ID and group ID of CephFS mount as root...")
80 self
.fs
.conf_set("client_mount_uid", "0")
81 self
.fs
.conf_set("client_mount_gid", "0")
82 log
.debug("CephFS initializing...")
84 log
.debug("CephFS mounting...")
85 self
.fs
.mount(filesystem_name
=self
.fs_name
.encode('utf-8'))
86 log
.debug("Connection to cephfs '{0}' complete".format(self
.fs_name
))
90 assert self
.ops_in_progress
== 0
91 log
.info("disconnecting from cephfs '{0}'".format(self
.fs_name
))
94 except Exception as e
:
95 log
.debug("disconnect: ({0})".format(e
))
99 assert self
.ops_in_progress
== 0
100 log
.info("aborting connection from cephfs '{0}'".format(self
.fs_name
))
102 log
.info("abort done from cephfs '{0}'".format(self
.fs_name
))
107 recurring timer variant of Timer
111 while not self
.finished
.is_set():
112 self
.finished
.wait(self
.interval
)
113 self
.function(*self
.args
, **self
.kwargs
)
115 except Exception as e
:
116 log
.error("ConnectionPool.RTimer: %s", e
)
119 # TODO: make this configurable
120 TIMER_TASK_RUN_INTERVAL
= 30.0 # seconds
121 CONNECTION_IDLE_INTERVAL
= 60.0 # seconds
123 def __init__(self
, mgr
):
125 self
.connections
= {}
127 self
.cond
= Condition(self
.lock
)
128 self
.timer_task
= ConnectionPool
.RTimer(ConnectionPool
.TIMER_TASK_RUN_INTERVAL
,
129 self
.cleanup_connections
)
130 self
.timer_task
.start()
132 def cleanup_connections(self
):
134 log
.info("scanning for idle connections..")
135 idle_fs
= [fs_name
for fs_name
,conn
in self
.connections
.items()
136 if conn
.is_connection_idle(ConnectionPool
.CONNECTION_IDLE_INTERVAL
)]
137 for fs_name
in idle_fs
:
138 log
.info("cleaning up connection for '{}'".format(fs_name
))
139 self
._del
_fs
_handle
(fs_name
)
141 def get_fs_handle(self
, fs_name
):
145 conn
= self
.connections
.get(fs_name
, None)
147 if conn
.is_connection_valid():
148 return conn
.get_fs_handle()
150 # filesystem id changed beneath us (or the filesystem does not exist).
151 # this is possible if the filesystem got removed (and recreated with
152 # same name) via "ceph fs rm/new" mon command.
153 log
.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name
))
154 self
._del
_fs
_handle
(fs_name
)
155 conn
= ConnectionPool
.Connection(self
.mgr
, fs_name
)
157 except cephfs
.Error
as e
:
158 # try to provide a better error string if possible
159 if e
.args
[0] == errno
.ENOENT
:
160 raise VolumeException(
161 -errno
.ENOENT
, "Volume '{0}' not found".format(fs_name
))
162 raise VolumeException(-e
.args
[0], e
.args
[1])
163 self
.connections
[fs_name
] = conn
164 return conn
.get_fs_handle()
166 def put_fs_handle(self
, fs_name
):
168 conn
= self
.connections
.get(fs_name
, None)
170 conn
.put_fs_handle(notify
=lambda: self
.cond
.notifyAll())
172 def _del_fs_handle(self
, fs_name
, wait
=False):
173 conn
= self
.connections
.pop(fs_name
, None)
175 conn
.del_fs_handle(waiter
=None if not wait
else lambda: self
.cond
.wait())
177 def del_fs_handle(self
, fs_name
, wait
=False):
179 self
._del
_fs
_handle
(fs_name
, wait
)
181 def del_all_handles(self
):
183 for fs_name
in list(self
.connections
.keys()):
184 log
.info("waiting for pending ops for '{}'".format(fs_name
))
185 self
._del
_fs
_handle
(fs_name
, wait
=True)
186 log
.info("pending ops completed for '{}'".format(fs_name
))
187 # no new connections should have been initialized since its
188 # guarded on shutdown.
189 assert len(self
.connections
) == 0
191 def gen_pool_names(volname
):
193 return metadata and data pool name (from a filesystem/volume name) as a tuple
195 return "cephfs.{}.meta".format(volname
), "cephfs.{}.data".format(volname
)
197 def create_volume(mgr
, volname
):
199 create volume (pool, filesystem and mds)
201 metadata_pool
, data_pool
= gen_pool_names(volname
)
203 r
, outs
, outb
= create_pool(mgr
, metadata_pool
, 16)
206 r
, outb
, outs
= create_pool(mgr
, data_pool
, 8)
209 remove_pool(metadata_pool
)
212 r
, outb
, outs
= create_filesystem(mgr
, volname
, metadata_pool
, data_pool
)
214 log
.error("Filesystem creation error: {0} {1} {2}".format(r
, outb
, outs
))
216 remove_pool(data_pool
)
217 remove_pool(metadata_pool
)
220 return create_mds(mgr
, volname
)
222 def delete_volume(mgr
, volname
):
224 delete the given module (tear down mds, remove filesystem)
226 # Tear down MDS daemons
228 completion
= mgr
.remove_stateless_service("mds", volname
)
229 mgr
._orchestrator
_wait
([completion
])
230 orchestrator
.raise_if_exception(completion
)
231 except (ImportError, orchestrator
.OrchestratorError
):
232 log
.warning("OrchestratorError, not tearing down MDS daemons")
233 except Exception as e
:
234 # Don't let detailed orchestrator exceptions (python backtraces)
235 # bubble out to the user
236 log
.exception("Failed to tear down MDS daemons")
237 return -errno
.EINVAL
, "", str(e
)
239 # In case orchestrator didn't tear down MDS daemons cleanly, or
240 # there was no orchestrator, we force the daemons down.
241 if volume_exists(mgr
, volname
):
242 r
, outb
, outs
= remove_filesystem(mgr
, volname
)
246 err
= "Filesystem not found for volume '{0}'".format(volname
)
248 return -errno
.ENOENT
, "", err
249 metadata_pool
, data_pool
= gen_pool_names(volname
)
250 r
, outb
, outs
= remove_pool(mgr
, metadata_pool
)
253 return remove_pool(mgr
, data_pool
)
255 def list_volumes(mgr
):
257 list all filesystem volumes.
263 fs_map
= mgr
.get("fs_map")
264 for f
in fs_map
['filesystems']:
265 result
.append({'name': f
['mdsmap']['fs_name']})
269 def open_volume(vc
, volname
):
271 open a volume for exclusive access. This API is to be used as a context manager.
273 :param vc: volume client instance
274 :param volname: volume name
275 :return: yields a volume handle (ceph filesystem handle)
278 raise VolumeException(-errno
.ESHUTDOWN
, "shutdown in progress")
280 g_lock
= GlobalLock()
281 fs_handle
= vc
.connection_pool
.get_fs_handle(volname
)
283 with g_lock
.lock_op():
286 vc
.connection_pool
.put_fs_handle(volname
)
289 def open_volume_lockless(vc
, volname
):
291 open a volume with shared access. This API is to be used as a context manager.
293 :param vc: volume client instance
294 :param volname: volume name
295 :return: yields a volume handle (ceph filesystem handle)
298 raise VolumeException(-errno
.ESHUTDOWN
, "shutdown in progress")
300 fs_handle
= vc
.connection_pool
.get_fs_handle(volname
)
304 vc
.connection_pool
.put_fs_handle(volname
)