]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/operations/volume.py
17f377ca44a14a79dd9f28ec9d95d85ec9feeb8c
6 from typing
import List
8 from contextlib
import contextmanager
9 from threading
import Lock
, Condition
10 from typing
import no_type_check
12 if sys
.version_info
>= (3, 3):
13 from threading
import Timer
15 from threading
import _Timer
as Timer
20 from .lock
import GlobalLock
21 from ..exception
import VolumeException
22 from ..fs_util
import create_pool
, remove_pool
, create_filesystem
, \
23 remove_filesystem
, create_mds
, volume_exists
25 log
= logging
.getLogger(__name__
)
27 class ConnectionPool(object):
28 class Connection(object):
29 def __init__(self
, mgr
, fs_name
):
32 self
.fs_name
= fs_name
33 self
.ops_in_progress
= 0
34 self
.last_used
= time
.time()
35 self
.fs_id
= self
.get_fs_id()
38 fs_map
= self
.mgr
.get('fs_map')
39 for fs
in fs_map
['filesystems']:
40 if fs
['mdsmap']['fs_name'] == self
.fs_name
:
42 raise VolumeException(
43 -errno
.ENOENT
, "Volume '{0}' not found".format(self
.fs_name
))
45 def get_fs_handle(self
):
46 self
.last_used
= time
.time()
47 self
.ops_in_progress
+= 1
50 def put_fs_handle(self
, notify
):
51 assert self
.ops_in_progress
> 0
52 self
.ops_in_progress
-= 1
53 if self
.ops_in_progress
== 0:
56 def del_fs_handle(self
, waiter
):
58 while self
.ops_in_progress
!= 0:
60 if self
.is_connection_valid():
65 def is_connection_valid(self
):
68 fs_id
= self
.get_fs_id()
70 # the filesystem does not exist now -- connection is not valid.
72 log
.debug("self.fs_id={0}, fs_id={1}".format(self
.fs_id
, fs_id
))
73 return self
.fs_id
== fs_id
75 def is_connection_idle(self
, timeout
):
76 return (self
.ops_in_progress
== 0 and ((time
.time() - self
.last_used
) >= timeout
))
79 assert self
.ops_in_progress
== 0
80 log
.debug("Connecting to cephfs '{0}'".format(self
.fs_name
))
81 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.mgr
.rados
)
82 log
.debug("Setting user ID and group ID of CephFS mount as root...")
83 self
.fs
.conf_set("client_mount_uid", "0")
84 self
.fs
.conf_set("client_mount_gid", "0")
85 log
.debug("CephFS initializing...")
87 log
.debug("CephFS mounting...")
88 self
.fs
.mount(filesystem_name
=self
.fs_name
.encode('utf-8'))
89 log
.debug("Connection to cephfs '{0}' complete".format(self
.fs_name
))
90 self
.mgr
._ceph
_register
_client
(self
.fs
.get_addrs())
95 assert self
.ops_in_progress
== 0
96 log
.info("disconnecting from cephfs '{0}'".format(self
.fs_name
))
97 addrs
= self
.fs
.get_addrs()
99 self
.mgr
._ceph
_unregister
_client
(addrs
)
101 except Exception as e
:
102 log
.debug("disconnect: ({0})".format(e
))
107 assert self
.ops_in_progress
== 0
108 log
.info("aborting connection from cephfs '{0}'".format(self
.fs_name
))
110 log
.info("abort done from cephfs '{0}'".format(self
.fs_name
))
115 recurring timer variant of Timer
120 while not self
.finished
.is_set():
121 self
.finished
.wait(self
.interval
)
122 self
.function(*self
.args
, **self
.kwargs
)
124 except Exception as e
:
125 log
.error("ConnectionPool.RTimer: %s", e
)
128 # TODO: make this configurable
129 TIMER_TASK_RUN_INTERVAL
= 30.0 # seconds
130 CONNECTION_IDLE_INTERVAL
= 60.0 # seconds
132 def __init__(self
, mgr
):
134 self
.connections
= {}
136 self
.cond
= Condition(self
.lock
)
137 self
.timer_task
= ConnectionPool
.RTimer(ConnectionPool
.TIMER_TASK_RUN_INTERVAL
,
138 self
.cleanup_connections
)
139 self
.timer_task
.start()
141 def cleanup_connections(self
):
143 log
.info("scanning for idle connections..")
144 idle_fs
= [fs_name
for fs_name
,conn
in self
.connections
.items()
145 if conn
.is_connection_idle(ConnectionPool
.CONNECTION_IDLE_INTERVAL
)]
146 for fs_name
in idle_fs
:
147 log
.info("cleaning up connection for '{}'".format(fs_name
))
148 self
._del
_fs
_handle
(fs_name
)
150 def get_fs_handle(self
, fs_name
):
154 conn
= self
.connections
.get(fs_name
, None)
156 if conn
.is_connection_valid():
157 return conn
.get_fs_handle()
159 # filesystem id changed beneath us (or the filesystem does not exist).
160 # this is possible if the filesystem got removed (and recreated with
161 # same name) via "ceph fs rm/new" mon command.
162 log
.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name
))
163 self
._del
_fs
_handle
(fs_name
)
164 conn
= ConnectionPool
.Connection(self
.mgr
, fs_name
)
166 except cephfs
.Error
as e
:
167 # try to provide a better error string if possible
168 if e
.args
[0] == errno
.ENOENT
:
169 raise VolumeException(
170 -errno
.ENOENT
, "Volume '{0}' not found".format(fs_name
))
171 raise VolumeException(-e
.args
[0], e
.args
[1])
172 self
.connections
[fs_name
] = conn
173 return conn
.get_fs_handle()
175 def put_fs_handle(self
, fs_name
):
177 conn
= self
.connections
.get(fs_name
, None)
179 conn
.put_fs_handle(notify
=lambda: self
.cond
.notifyAll())
181 def _del_fs_handle(self
, fs_name
, wait
=False):
182 conn
= self
.connections
.pop(fs_name
, None)
184 conn
.del_fs_handle(waiter
=None if not wait
else lambda: self
.cond
.wait())
186 def del_fs_handle(self
, fs_name
, wait
=False):
188 self
._del
_fs
_handle
(fs_name
, wait
)
190 def del_all_handles(self
):
192 for fs_name
in list(self
.connections
.keys()):
193 log
.info("waiting for pending ops for '{}'".format(fs_name
))
194 self
._del
_fs
_handle
(fs_name
, wait
=True)
195 log
.info("pending ops completed for '{}'".format(fs_name
))
196 # no new connections should have been initialized since its
197 # guarded on shutdown.
198 assert len(self
.connections
) == 0
200 def gen_pool_names(volname
):
202 return metadata and data pool name (from a filesystem/volume name) as a tuple
204 return "cephfs.{}.meta".format(volname
), "cephfs.{}.data".format(volname
)
206 def get_pool_names(mgr
, volname
):
208 return metadata and data pools (list) names of volume as a tuple
210 fs_map
= mgr
.get("fs_map")
211 metadata_pool_id
= None
212 data_pool_ids
= [] # type: List[int]
213 for f
in fs_map
['filesystems']:
214 if volname
== f
['mdsmap']['fs_name']:
215 metadata_pool_id
= f
['mdsmap']['metadata_pool']
216 data_pool_ids
= f
['mdsmap']['data_pools']
218 if metadata_pool_id
is None:
221 osdmap
= mgr
.get("osd_map")
222 pools
= dict([(p
['pool'], p
['pool_name']) for p
in osdmap
['pools']])
223 metadata_pool
= pools
[metadata_pool_id
]
224 data_pools
= [pools
[id] for id in data_pool_ids
]
225 return metadata_pool
, data_pools
227 def create_volume(mgr
, volname
, placement
):
229 create volume (pool, filesystem and mds)
231 metadata_pool
, data_pool
= gen_pool_names(volname
)
233 r
, outs
, outb
= create_pool(mgr
, metadata_pool
)
236 r
, outb
, outs
= create_pool(mgr
, data_pool
)
239 remove_pool(mgr
, metadata_pool
)
242 r
, outb
, outs
= create_filesystem(mgr
, volname
, metadata_pool
, data_pool
)
244 log
.error("Filesystem creation error: {0} {1} {2}".format(r
, outb
, outs
))
246 remove_pool(mgr
, data_pool
)
247 remove_pool(mgr
, metadata_pool
)
250 return create_mds(mgr
, volname
, placement
)
252 def delete_volume(mgr
, volname
, metadata_pool
, data_pools
):
254 delete the given module (tear down mds, remove filesystem, remove pools)
256 # Tear down MDS daemons
258 completion
= mgr
.remove_service('mds.' + volname
)
259 mgr
._orchestrator
_wait
([completion
])
260 orchestrator
.raise_if_exception(completion
)
261 except (ImportError, orchestrator
.OrchestratorError
):
262 log
.warning("OrchestratorError, not tearing down MDS daemons")
263 except Exception as e
:
264 # Don't let detailed orchestrator exceptions (python backtraces)
265 # bubble out to the user
266 log
.exception("Failed to tear down MDS daemons")
267 return -errno
.EINVAL
, "", str(e
)
269 # In case orchestrator didn't tear down MDS daemons cleanly, or
270 # there was no orchestrator, we force the daemons down.
271 if volume_exists(mgr
, volname
):
272 r
, outb
, outs
= remove_filesystem(mgr
, volname
)
276 err
= "Filesystem not found for volume '{0}'".format(volname
)
278 return -errno
.ENOENT
, "", err
279 r
, outb
, outs
= remove_pool(mgr
, metadata_pool
)
283 for data_pool
in data_pools
:
284 r
, outb
, outs
= remove_pool(mgr
, data_pool
)
287 result_str
= "metadata pool: {0} data pool: {1} removed".format(metadata_pool
, str(data_pools
))
288 return r
, result_str
, ""
290 def list_volumes(mgr
):
292 list all filesystem volumes.
298 fs_map
= mgr
.get("fs_map")
299 for f
in fs_map
['filesystems']:
300 result
.append({'name': f
['mdsmap']['fs_name']})
304 def open_volume(vc
, volname
):
306 open a volume for exclusive access. This API is to be used as a context manager.
308 :param vc: volume client instance
309 :param volname: volume name
310 :return: yields a volume handle (ceph filesystem handle)
313 raise VolumeException(-errno
.ESHUTDOWN
, "shutdown in progress")
315 g_lock
= GlobalLock()
316 fs_handle
= vc
.connection_pool
.get_fs_handle(volname
)
318 with g_lock
.lock_op():
321 vc
.connection_pool
.put_fs_handle(volname
)
324 def open_volume_lockless(vc
, volname
):
326 open a volume with shared access. This API is to be used as a context manager.
328 :param vc: volume client instance
329 :param volname: volume name
330 :return: yields a volume handle (ceph filesystem handle)
333 raise VolumeException(-errno
.ESHUTDOWN
, "shutdown in progress")
335 fs_handle
= vc
.connection_pool
.get_fs_handle(volname
)
339 vc
.connection_pool
.put_fs_handle(volname
)