]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mirroring/fs/snapshot_mirror.py
6fa8d0c4c53382b635c27d1fff0516d2354dc1c5
[ceph.git] / ceph / src / pybind / mgr / mirroring / fs / snapshot_mirror.py
1 import base64
2 import errno
3 import json
4 import logging
5 import os
6 import pickle
7 import re
8 import stat
9 import threading
10 import uuid
11 from typing import Dict, Any
12
13 import cephfs
14 import rados
15
16 from mgr_util import RTimer, CephfsClient, open_filesystem,\
17 CephfsConnectionException
18 from mgr_module import NotifyType
19 from .blocklist import blocklist
20 from .notify import Notifier, InstanceWatcher
21 from .utils import INSTANCE_ID_PREFIX, MIRROR_OBJECT_NAME, Finisher, \
22 AsyncOpTracker, connect_to_filesystem, disconnect_from_filesystem
23 from .exception import MirrorException
24 from .dir_map.create import create_mirror_object
25 from .dir_map.load import load_dir_map, load_instances
26 from .dir_map.update import UpdateDirMapRequest, UpdateInstanceRequest
27 from .dir_map.policy import Policy
28 from .dir_map.state_transition import ActionType
29
30 log = logging.getLogger(__name__)
31
32 CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL = 1
33
34 class FSPolicy:
35 class InstanceListener(InstanceWatcher.Listener):
36 def __init__(self, fspolicy):
37 self.fspolicy = fspolicy
38
39 def handle_instances(self, added, removed):
40 self.fspolicy.update_instances(added, removed)
41
42 def __init__(self, mgr, ioctx):
43 self.mgr = mgr
44 self.ioctx = ioctx
45 self.pending = []
46 self.policy = Policy()
47 self.lock = threading.Lock()
48 self.cond = threading.Condition(self.lock)
49 self.dir_paths = []
50 self.async_requests = {}
51 self.finisher = Finisher()
52 self.op_tracker = AsyncOpTracker()
53 self.notifier = Notifier(ioctx)
54 self.instance_listener = FSPolicy.InstanceListener(self)
55 self.instance_watcher = None
56 self.stopping = threading.Event()
57 self.timer_task = RTimer(CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL,
58 self.process_updates)
59 self.timer_task.start()
60
61 def schedule_action(self, dir_paths):
62 self.dir_paths.extend(dir_paths)
63
64 def init(self, dir_mapping, instances):
65 with self.lock:
66 self.policy.init(dir_mapping)
67 # we'll schedule action for all directories, so don't bother capturing
68 # directory names here.
69 self.policy.add_instances(list(instances.keys()), initial_update=True)
70 self.instance_watcher = InstanceWatcher(self.ioctx, instances,
71 self.instance_listener)
72 self.schedule_action(list(dir_mapping.keys()))
73
74 def shutdown(self):
75 with self.lock:
76 log.debug('FSPolicy.shutdown')
77 self.stopping.set()
78 log.debug('canceling update timer task')
79 self.timer_task.cancel()
80 log.debug('update timer task canceled')
81 if self.instance_watcher:
82 log.debug('stopping instance watcher')
83 self.instance_watcher.wait_and_stop()
84 log.debug('stopping instance watcher')
85 self.op_tracker.wait_for_ops()
86 log.debug('FSPolicy.shutdown done')
87
88 def handle_update_mapping(self, updates, removals, request_id, callback, r):
89 log.info(f'handle_update_mapping: {updates} {removals} {request_id} {callback} {r}')
90 with self.lock:
91 try:
92 self.async_requests.pop(request_id)
93 if callback:
94 callback(updates, removals, r)
95 finally:
96 self.op_tracker.finish_async_op()
97
98 def handle_update_instances(self, instances_added, instances_removed, request_id, r):
99 log.info(f'handle_update_instances: {instances_added} {instances_removed} {request_id} {r}')
100 with self.lock:
101 try:
102 self.async_requests.pop(request_id)
103 if self.stopping.is_set():
104 log.debug(f'handle_update_instances: policy shutting down')
105 return
106 schedules = []
107 if instances_removed:
108 schedules.extend(self.policy.remove_instances(instances_removed))
109 if instances_added:
110 schedules.extend(self.policy.add_instances(instances_added))
111 self.schedule_action(schedules)
112 finally:
113 self.op_tracker.finish_async_op()
114
115 def update_mapping(self, update_map, removals, callback=None):
116 log.info(f'updating directory map: {len(update_map)}+{len(removals)} updates')
117 request_id = str(uuid.uuid4())
118 def async_callback(r):
119 self.finisher.queue(self.handle_update_mapping,
120 [list(update_map.keys()), removals, request_id, callback, r])
121 request = UpdateDirMapRequest(self.ioctx, update_map.copy(), removals.copy(), async_callback)
122 self.async_requests[request_id] = request
123 self.op_tracker.start_async_op()
124 log.debug(f'async request_id: {request_id}')
125 request.send()
126
127 def update_instances(self, added, removed):
128 logging.debug(f'update_instances: added={added}, removed={removed}')
129 for instance_id, addr in removed.items():
130 log.info(f'blocklisting instance_id: {instance_id} addr: {addr}')
131 blocklist(self.mgr, addr)
132 with self.lock:
133 instances_added = {}
134 instances_removed = []
135 for instance_id, addr in added.items():
136 instances_added[instance_id] = {'version': 1, 'addr': addr}
137 instances_removed = list(removed.keys())
138 request_id = str(uuid.uuid4())
139 def async_callback(r):
140 self.finisher.queue(self.handle_update_instances,
141 [list(instances_added.keys()), instances_removed, request_id, r])
142 # blacklisted instances can be removed at this point. remapping directories
143 # mapped to blacklisted instances on module startup is handled in policy
144 # add_instances().
145 request = UpdateInstanceRequest(self.ioctx, instances_added.copy(),
146 instances_removed.copy(), async_callback)
147 self.async_requests[request_id] = request
148 log.debug(f'async request_id: {request_id}')
149 self.op_tracker.start_async_op()
150 request.send()
151
152 def continue_action(self, updates, removals, r):
153 log.debug(f'continuing action: {updates}+{removals} r={r}')
154 if self.stopping.is_set():
155 log.debug('continue_action: policy shutting down')
156 return
157 schedules = []
158 for dir_path in updates:
159 schedule = self.policy.finish_action(dir_path, r)
160 if schedule:
161 schedules.append(dir_path)
162 for dir_path in removals:
163 schedule = self.policy.finish_action(dir_path, r)
164 if schedule:
165 schedules.append(dir_path)
166 self.schedule_action(schedules)
167
168 def handle_peer_ack(self, dir_path, r):
169 log.info(f'handle_peer_ack: {dir_path} r={r}')
170 with self.lock:
171 try:
172 if self.stopping.is_set():
173 log.debug(f'handle_peer_ack: policy shutting down')
174 return
175 self.continue_action([dir_path], [], r)
176 finally:
177 self.op_tracker.finish_async_op()
178
179 def process_updates(self):
180 def acquire_message(dir_path):
181 return json.dumps({'dir_path': dir_path,
182 'mode': 'acquire'
183 })
184 def release_message(dir_path):
185 return json.dumps({'dir_path': dir_path,
186 'mode': 'release'
187 })
188 with self.lock:
189 if not self.dir_paths or self.stopping.is_set():
190 return
191 update_map = {}
192 removals = []
193 notifies = {}
194 instance_purges = []
195 for dir_path in self.dir_paths:
196 action_type = self.policy.start_action(dir_path)
197 lookup_info = self.policy.lookup(dir_path)
198 log.debug(f'processing action: dir_path: {dir_path}, lookup_info: {lookup_info}, action_type: {action_type}')
199 if action_type == ActionType.NONE:
200 continue
201 elif action_type == ActionType.MAP_UPDATE:
202 # take care to not overwrite purge status
203 update_map[dir_path] = {'version': 1,
204 'instance_id': lookup_info['instance_id'],
205 'last_shuffled': lookup_info['mapped_time']
206 }
207 if lookup_info['purging']:
208 update_map[dir_path]['purging'] = 1
209 elif action_type == ActionType.MAP_REMOVE:
210 removals.append(dir_path)
211 elif action_type == ActionType.ACQUIRE:
212 notifies[dir_path] = (lookup_info['instance_id'], acquire_message(dir_path))
213 elif action_type == ActionType.RELEASE:
214 notifies[dir_path] = (lookup_info['instance_id'], release_message(dir_path))
215 if update_map or removals:
216 self.update_mapping(update_map, removals, callback=self.continue_action)
217 for dir_path, message in notifies.items():
218 self.op_tracker.start_async_op()
219 self.notifier.notify(dir_path, message, self.handle_peer_ack)
220 self.dir_paths.clear()
221
222 def add_dir(self, dir_path):
223 with self.lock:
224 lookup_info = self.policy.lookup(dir_path)
225 if lookup_info:
226 if lookup_info['purging']:
227 raise MirrorException(-errno.EAGAIN, f'remove in-progress for {dir_path}')
228 else:
229 raise MirrorException(-errno.EEXIST, f'directory {dir_path} is already tracked')
230 schedule = self.policy.add_dir(dir_path)
231 if not schedule:
232 return
233 update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}}
234 updated = False
235 def update_safe(updates, removals, r):
236 nonlocal updated
237 updated = True
238 self.cond.notifyAll()
239 self.update_mapping(update_map, [], callback=update_safe)
240 self.cond.wait_for(lambda: updated)
241 self.schedule_action([dir_path])
242
243 def remove_dir(self, dir_path):
244 with self.lock:
245 lookup_info = self.policy.lookup(dir_path)
246 if not lookup_info:
247 raise MirrorException(-errno.ENOENT, f'directory {dir_path} id not tracked')
248 if lookup_info['purging']:
249 raise MirrorException(-errno.EINVAL, f'directory {dir_path} is under removal')
250 update_map = {dir_path: {'version': 1,
251 'instance_id': lookup_info['instance_id'],
252 'last_shuffled': lookup_info['mapped_time'],
253 'purging': 1}}
254 updated = False
255 sync_lock = threading.Lock()
256 sync_cond = threading.Condition(sync_lock)
257 def update_safe(r):
258 with sync_lock:
259 nonlocal updated
260 updated = True
261 sync_cond.notifyAll()
262 request = UpdateDirMapRequest(self.ioctx, update_map.copy(), [], update_safe)
263 request.send()
264 with sync_lock:
265 sync_cond.wait_for(lambda: updated)
266 schedule = self.policy.remove_dir(dir_path)
267 if schedule:
268 self.schedule_action([dir_path])
269
270 def status(self, dir_path):
271 with self.lock:
272 res = self.policy.dir_status(dir_path)
273 return 0, json.dumps(res, indent=4, sort_keys=True), ''
274
275 def summary(self):
276 with self.lock:
277 res = self.policy.instance_summary()
278 return 0, json.dumps(res, indent=4, sort_keys=True), ''
279
280 class FSSnapshotMirror:
281 PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer"
282
283 def __init__(self, mgr):
284 self.mgr = mgr
285 self.rados = mgr.rados
286 self.pool_policy = {}
287 self.fs_map = self.mgr.get('fs_map')
288 self.lock = threading.Lock()
289 self.refresh_pool_policy()
290 self.local_fs = CephfsClient(mgr)
291
292 def notify(self, notify_type: NotifyType):
293 log.debug(f'got notify type {notify_type}')
294 if notify_type == NotifyType.fs_map:
295 with self.lock:
296 self.fs_map = self.mgr.get('fs_map')
297 self.refresh_pool_policy_locked()
298
299 @staticmethod
300 def make_spec(client_name, cluster_name):
301 return f'{client_name}@{cluster_name}'
302
303 @staticmethod
304 def split_spec(spec):
305 try:
306 client_id, cluster_name = spec.split('@')
307 _, client_name = client_id.split('.')
308 return client_name, cluster_name
309 except ValueError:
310 raise MirrorException(-errno.EINVAL, f'invalid cluster spec {spec}')
311
312 @staticmethod
313 def get_metadata_pool(filesystem, fs_map):
314 for fs in fs_map['filesystems']:
315 if fs['mdsmap']['fs_name'] == filesystem:
316 return fs['mdsmap']['metadata_pool']
317 return None
318
319 @staticmethod
320 def get_filesystem_id(filesystem, fs_map):
321 for fs in fs_map['filesystems']:
322 if fs['mdsmap']['fs_name'] == filesystem:
323 return fs['id']
324 return None
325
326 @staticmethod
327 def peer_config_key(filesystem, peer_uuid):
328 return f'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}'
329
330 def config_set(self, key, val=None):
331 """set or remove a key from mon config store"""
332 if val:
333 cmd = {'prefix': 'config-key set',
334 'key': key, 'val': val}
335 else:
336 cmd = {'prefix': 'config-key rm',
337 'key': key}
338 r, outs, err = self.mgr.mon_command(cmd)
339 if r < 0:
340 log.error(f'mon command to set/remove config-key {key} failed: {err}')
341 raise Exception(-errno.EINVAL)
342
343 def config_get(self, key):
344 """fetch a config key value from mon config store"""
345 cmd = {'prefix': 'config-key get', 'key': key}
346 r, outs, err = self.mgr.mon_command(cmd)
347 if r < 0 and not r == -errno.ENOENT:
348 log.error(f'mon command to get config-key {key} failed: {err}')
349 raise Exception(-errno.EINVAL)
350 val = {}
351 if r == 0:
352 val = json.loads(outs)
353 return val
354
355 def filesystem_exist(self, filesystem):
356 for fs in self.fs_map['filesystems']:
357 if fs['mdsmap']['fs_name'] == filesystem:
358 return True
359 return False
360
361 def get_mirrored_filesystems(self):
362 return [fs['mdsmap']['fs_name'] for fs in self.fs_map['filesystems'] if fs.get('mirror_info', None)]
363
364 def get_filesystem_peers(self, filesystem):
365 """To be used when mirroring in enabled for the filesystem"""
366 for fs in self.fs_map['filesystems']:
367 if fs['mdsmap']['fs_name'] == filesystem:
368 return fs['mirror_info']['peers']
369 return None
370
371 def peer_exists(self, filesystem, remote_cluster_spec, remote_fs_name):
372 peers = self.get_filesystem_peers(filesystem)
373 for _, rem in peers.items():
374 remote = rem['remote']
375 spec = FSSnapshotMirror.make_spec(remote['client_name'], remote['cluster_name'])
376 if spec == remote_cluster_spec and remote['fs_name'] == remote_fs_name:
377 return True
378 return False
379
380 @staticmethod
381 def get_mirror_info(fs):
382 try:
383 val = fs.getxattr('/', 'ceph.mirror.info')
384 match = re.search(r'^cluster_id=([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}) fs_id=(\d+)$',
385 val.decode('utf-8'))
386 if match and len(match.groups()) == 2:
387 return {'cluster_id': match.group(1),
388 'fs_id': int(match.group(2))
389 }
390 raise MirrorException(-errno.EINVAL, 'invalid ceph.mirror.info value format')
391 except cephfs.Error as e:
392 raise MirrorException(-e.errno, 'error fetching ceph.mirror.info xattr')
393
394 @staticmethod
395 def set_mirror_info(local_cluster_id, local_fsid, remote_fs):
396 log.info(f'setting {local_cluster_id}::{local_fsid} on remote')
397 try:
398 remote_fs.setxattr('/', 'ceph.mirror.info',
399 f'cluster_id={local_cluster_id} fs_id={local_fsid}'.encode('utf-8'), os.XATTR_CREATE)
400 except cephfs.Error as e:
401 if e.errno == errno.EEXIST:
402 try:
403 mi = FSSnapshotMirror.get_mirror_info(remote_fs)
404 cluster_id = mi['cluster_id']
405 fs_id = mi['fs_id']
406 if not (cluster_id == local_cluster_id and fs_id == local_fsid):
407 raise MirrorException(-errno.EEXIST, f'peer mirrorred by: (cluster_id: {cluster_id}, fs_id: {fs_id})')
408 except MirrorException:
409 # if mirror info cannot be fetched for some reason, let's just
410 # fail.
411 raise MirrorException(-errno.EEXIST, f'already an active peer')
412 else:
413 log.error(f'error setting mirrored fsid: {e}')
414 raise Exception(-e.errno)
415
416 def resolve_peer(self, fs_name, peer_uuid):
417 peers = self.get_filesystem_peers(fs_name)
418 for peer, rem in peers.items():
419 if peer == peer_uuid:
420 return rem['remote']
421 return None
422
423 def purge_mirror_info(self, local_fs_name, peer_uuid):
424 log.debug(f'local fs={local_fs_name} peer_uuid={peer_uuid}')
425 # resolve the peer to its spec
426 rem = self.resolve_peer(local_fs_name, peer_uuid)
427 if not rem:
428 return
429 log.debug(f'peer_uuid={peer_uuid} resolved to {rem}')
430 _, client_name = rem['client_name'].split('.')
431
432 # fetch auth details from config store
433 remote_conf = self.config_get(FSSnapshotMirror.peer_config_key(local_fs_name, peer_uuid))
434 remote_cluster, remote_fs = connect_to_filesystem(client_name,
435 rem['cluster_name'],
436 rem['fs_name'], 'remote', conf_dct=remote_conf)
437 try:
438 remote_fs.removexattr('/', 'ceph.mirror.info')
439 except cephfs.Error as e:
440 if not e.errno == errno.ENOENT:
441 log.error('error removing mirror info')
442 raise Exception(-e.errno)
443 finally:
444 disconnect_from_filesystem(rem['cluster_name'], rem['fs_name'], remote_cluster, remote_fs)
445
446 def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name, remote_conf={}):
447 log.debug(f'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}')
448
449 client_name, cluster_name = FSSnapshotMirror.split_spec(remote_cluster_spec)
450 remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name, remote_fs_name,
451 'remote', conf_dct=remote_conf)
452 try:
453 local_cluster_id = self.rados.get_fsid()
454 remote_cluster_id = remote_cluster.get_fsid()
455 log.debug(f'local_cluster_id={local_cluster_id} remote_cluster_id={remote_cluster_id}')
456 if 'fsid' in remote_conf:
457 if not remote_cluster_id == remote_conf['fsid']:
458 raise MirrorException(-errno.EINVAL, 'FSID mismatch between bootstrap token and remote cluster')
459
460 local_fscid = remote_fscid = None
461 with open_filesystem(self.local_fs, local_fs_name) as local_fsh:
462 local_fscid = local_fsh.get_fscid()
463 remote_fscid = remote_fs.get_fscid()
464 log.debug(f'local_fscid={local_fscid} remote_fscid={remote_fscid}')
465 mi = None
466 try:
467 mi = FSSnapshotMirror.get_mirror_info(local_fsh)
468 except MirrorException as me:
469 if me.args[0] != -errno.ENODATA:
470 raise Exception(-errno.EINVAL)
471 if mi and mi['cluster_id'] == remote_cluster_id and mi['fs_id'] == remote_fscid:
472 raise MirrorException(-errno.EINVAL, f'file system is an active peer for file system: {remote_fs_name}')
473
474 if local_cluster_id == remote_cluster_id and local_fscid == remote_fscid:
475 raise MirrorException(-errno.EINVAL, "'Source and destination cluster fsid and "\
476 "file-system name can't be the same")
477 FSSnapshotMirror.set_mirror_info(local_cluster_id, local_fscid, remote_fs)
478 finally:
479 disconnect_from_filesystem(cluster_name, remote_fs_name, remote_cluster, remote_fs)
480
481 def init_pool_policy(self, filesystem):
482 metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map)
483 if not metadata_pool_id:
484 log.error(f'cannot find metadata pool-id for filesystem {filesystem}')
485 raise Exception(-errno.EINVAL)
486 try:
487 ioctx = self.rados.open_ioctx2(metadata_pool_id)
488 # TODO: make async if required
489 dir_mapping = load_dir_map(ioctx)
490 instances = load_instances(ioctx)
491 # init policy
492 fspolicy = FSPolicy(self.mgr, ioctx)
493 log.debug(f'init policy for filesystem {filesystem}: pool-id {metadata_pool_id}')
494 fspolicy.init(dir_mapping, instances)
495 self.pool_policy[filesystem] = fspolicy
496 except rados.Error as e:
497 log.error(f'failed to access pool-id {metadata_pool_id} for filesystem {filesystem}: {e}')
498 raise Exception(-e.errno)
499
500 def refresh_pool_policy_locked(self):
501 filesystems = self.get_mirrored_filesystems()
502 log.debug(f'refreshing policy for {filesystems}')
503 for filesystem in list(self.pool_policy):
504 if not filesystem in filesystems:
505 log.info(f'shutdown pool policy for {filesystem}')
506 fspolicy = self.pool_policy.pop(filesystem)
507 fspolicy.shutdown()
508 for filesystem in filesystems:
509 if not filesystem in self.pool_policy:
510 log.info(f'init pool policy for {filesystem}')
511 self.init_pool_policy(filesystem)
512
513 def refresh_pool_policy(self):
514 with self.lock:
515 self.refresh_pool_policy_locked()
516
517 def enable_mirror(self, filesystem):
518 log.info(f'enabling mirror for filesystem {filesystem}')
519 with self.lock:
520 try:
521 metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map)
522 if not metadata_pool_id:
523 log.error(f'cannot find metadata pool-id for filesystem {filesystem}')
524 raise Exception(-errno.EINVAL)
525 create_mirror_object(self.rados, metadata_pool_id)
526 cmd = {'prefix': 'fs mirror enable', 'fs_name': filesystem}
527 r, outs, err = self.mgr.mon_command(cmd)
528 if r < 0:
529 log.error(f'mon command to enable mirror failed: {err}')
530 raise Exception(-errno.EINVAL)
531 return 0, json.dumps({}), ''
532 except MirrorException as me:
533 return me.args[0], '', me.args[1]
534 except Exception as me:
535 return me.args[0], '', 'failed to enable mirroring'
536
537 def disable_mirror(self, filesystem):
538 log.info(f'disabling mirror for filesystem {filesystem}')
539 try:
540 with self.lock:
541 cmd = {'prefix': 'fs mirror disable', 'fs_name': filesystem}
542 r, outs, err = self.mgr.mon_command(cmd)
543 if r < 0:
544 log.error(f'mon command to disable mirror failed: {err}')
545 raise Exception(-errno.EINVAL)
546 return 0, json.dumps({}), ''
547 except MirrorException as me:
548 return me.args[0], '', me.args[1]
549 except Exception as e:
550 return e.args[0], '', 'failed to disable mirroring'
551
552 def peer_list(self, filesystem):
553 try:
554 with self.lock:
555 fspolicy = self.pool_policy.get(filesystem, None)
556 if not fspolicy:
557 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
558 peers = self.get_filesystem_peers(filesystem)
559 peer_res = {}
560 for peer_uuid, rem in peers.items():
561 conf = self.config_get(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid))
562 remote = rem['remote']
563 peer_res[peer_uuid] = {'client_name': remote['client_name'],
564 'site_name': remote['cluster_name'],
565 'fs_name': remote['fs_name']
566 }
567 if 'mon_host' in conf:
568 peer_res[peer_uuid]['mon_host'] = conf['mon_host']
569 return 0, json.dumps(peer_res), ''
570 except MirrorException as me:
571 return me.args[0], '', me.args[1]
572 except Exception as e:
573 return e.args[0], '', 'failed to list peers'
574
575 def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name, remote_conf):
576 try:
577 if remote_fs_name == None:
578 remote_fs_name = filesystem
579 with self.lock:
580 fspolicy = self.pool_policy.get(filesystem, None)
581 if not fspolicy:
582 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
583 ### peer updates for key, site-name are not yet supported
584 if self.peer_exists(filesystem, remote_cluster_spec, remote_fs_name):
585 return 0, json.dumps({}), ''
586 # _own_ the peer
587 self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name, remote_conf)
588 # unique peer uuid
589 peer_uuid = str(uuid.uuid4())
590 config_key = FSSnapshotMirror.peer_config_key(filesystem, peer_uuid)
591 if remote_conf.get('mon_host') and remote_conf.get('key'):
592 self.config_set(config_key, json.dumps(remote_conf))
593 cmd = {'prefix': 'fs mirror peer_add',
594 'fs_name': filesystem,
595 'uuid': peer_uuid,
596 'remote_cluster_spec': remote_cluster_spec,
597 'remote_fs_name': remote_fs_name}
598 r, outs, err = self.mgr.mon_command(cmd)
599 if r < 0:
600 log.error(f'mon command to add peer failed: {err}')
601 try:
602 log.debug(f'cleaning up config-key for {peer_uuid}')
603 self.config_set(config_key)
604 except:
605 pass
606 raise Exception(-errno.EINVAL)
607 return 0, json.dumps({}), ''
608 except MirrorException as me:
609 return me.args[0], '', me.args[1]
610 except Exception as e:
611 return e.args[0], '', 'failed to add peer'
612
613 def peer_remove(self, filesystem, peer_uuid):
614 try:
615 with self.lock:
616 fspolicy = self.pool_policy.get(filesystem, None)
617 if not fspolicy:
618 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
619 # ok, this is being a bit lazy. remove mirror info from peer followed
620 # by purging the peer from fsmap. if the mirror daemon fs map updates
621 # are laggy, they happily continue to synchronize. ideally, we should
622 # purge the peer from fsmap here and purge mirror info on fsmap update
623 # (in notify()). but thats not straightforward -- before purging mirror
624 # info, we would need to wait for all mirror daemons to catch up with
625 # fsmap updates. this involves mirror daemons sending the fsmap epoch
626 # they have seen in reply to a notify request. TODO: fix this.
627 self.purge_mirror_info(filesystem, peer_uuid)
628 cmd = {'prefix': 'fs mirror peer_remove',
629 'fs_name': filesystem,
630 'uuid': peer_uuid}
631 r, outs, err = self.mgr.mon_command(cmd)
632 if r < 0:
633 log.error(f'mon command to remove peer failed: {err}')
634 raise Exception(-errno.EINVAL)
635 self.config_set(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid))
636 return 0, json.dumps({}), ''
637 except MirrorException as me:
638 return me.args[0], '', me.args[1]
639 except Exception as e:
640 return e.args[0], '', 'failed to remove peer'
641
642 def peer_bootstrap_create(self, fs_name, client_name, site_name):
643 """create a bootstrap token for this peer filesystem"""
644 try:
645 with self.lock:
646 cmd = {'prefix': 'fs authorize',
647 'filesystem': fs_name,
648 'entity': client_name,
649 'caps': ['/', 'rwps']}
650 r, outs, err = self.mgr.mon_command(cmd)
651 if r < 0:
652 log.error(f'mon command to create peer user failed: {err}')
653 raise Exception(-errno.EINVAL)
654 cmd = {'prefix': 'auth get',
655 'entity': client_name,
656 'format': 'json'}
657 r, outs, err = self.mgr.mon_command(cmd)
658 if r < 0:
659 log.error(f'mon command to fetch keyring failed: {err}')
660 raise Exception(-errno.EINVAL)
661 outs = json.loads(outs)
662 outs0 = outs[0]
663 token_dct = {'fsid': self.mgr.rados.get_fsid(),
664 'filesystem': fs_name,
665 'user': outs0['entity'],
666 'site_name': site_name,
667 'key': outs0['key'],
668 'mon_host': self.mgr.rados.conf_get('mon_host')}
669 token_str = json.dumps(token_dct).encode('utf-8')
670 encoded_token = base64.b64encode(token_str)
671 return 0, json.dumps({'token': encoded_token.decode('utf-8')}), ''
672 except MirrorException as me:
673 return me.args[0], '', me.args[1]
674 except Exception as e:
675 return e.args[0], '', 'failed to bootstrap peer'
676
677 def peer_bootstrap_import(self, filesystem, token):
678 try:
679 token_str = base64.b64decode(token)
680 token_dct = json.loads(token_str.decode('utf-8'))
681 except:
682 return -errno.EINVAL, '', 'failed to parse token'
683 client_name = token_dct.pop('user')
684 cluster_name = token_dct.pop('site_name')
685 remote_fs_name = token_dct.pop('filesystem')
686 remote_cluster_spec = f'{client_name}@{cluster_name}'
687 return self.peer_add(filesystem, remote_cluster_spec, remote_fs_name, token_dct)
688
689 @staticmethod
690 def norm_path(dir_path):
691 if not os.path.isabs(dir_path):
692 raise MirrorException(-errno.EINVAL, f'{dir_path} should be an absolute path')
693 return os.path.normpath(dir_path)
694
695 def add_dir(self, filesystem, dir_path):
696 try:
697 with self.lock:
698 if not self.filesystem_exist(filesystem):
699 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
700 fspolicy = self.pool_policy.get(filesystem, None)
701 if not fspolicy:
702 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
703 dir_path = FSSnapshotMirror.norm_path(dir_path)
704 log.debug(f'path normalized to {dir_path}')
705 fspolicy.add_dir(dir_path)
706 return 0, json.dumps({}), ''
707 except MirrorException as me:
708 return me.args[0], '', me.args[1]
709 except Exception as e:
710 return e.args[0], '', 'failed to add directory'
711
712 def remove_dir(self, filesystem, dir_path):
713 try:
714 with self.lock:
715 if not self.filesystem_exist(filesystem):
716 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
717 fspolicy = self.pool_policy.get(filesystem, None)
718 if not fspolicy:
719 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
720 dir_path = FSSnapshotMirror.norm_path(dir_path)
721 fspolicy.remove_dir(dir_path)
722 return 0, json.dumps({}), ''
723 except MirrorException as me:
724 return me.args[0], '', me.args[1]
725 except Exception as e:
726 return e.args[0], '', 'failed to remove directory'
727
728 def status(self,filesystem, dir_path):
729 try:
730 with self.lock:
731 if not self.filesystem_exist(filesystem):
732 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
733 fspolicy = self.pool_policy.get(filesystem, None)
734 if not fspolicy:
735 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
736 dir_path = FSSnapshotMirror.norm_path(dir_path)
737 return fspolicy.status(dir_path)
738 except MirrorException as me:
739 return me.args[0], '', me.args[1]
740
741 def show_distribution(self, filesystem):
742 try:
743 with self.lock:
744 if not self.filesystem_exist(filesystem):
745 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
746 fspolicy = self.pool_policy.get(filesystem, None)
747 if not fspolicy:
748 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
749 return fspolicy.summary()
750 except MirrorException as me:
751 return me.args[0], '', me.args[1]
752
753 def daemon_status(self):
754 try:
755 with self.lock:
756 daemons = []
757 sm = self.mgr.get('service_map')
758 daemon_entry = sm['services'].get('cephfs-mirror', None)
759 log.debug(f'daemon_entry: {daemon_entry}')
760 if daemon_entry is not None:
761 for daemon_key in daemon_entry.get('daemons', []):
762 try:
763 daemon_id = int(daemon_key)
764 except ValueError:
765 continue
766 daemon = {
767 'daemon_id' : daemon_id,
768 'filesystems' : []
769 } # type: Dict[str, Any]
770 daemon_status = self.mgr.get_daemon_status('cephfs-mirror', daemon_key)
771 if not daemon_status:
772 log.debug(f'daemon status not yet availble for cephfs-mirror daemon: {daemon_key}')
773 continue
774 status = json.loads(daemon_status['status_json'])
775 for fs_id, fs_desc in status.items():
776 fs = {'filesystem_id' : int(fs_id),
777 'name' : fs_desc['name'],
778 'directory_count' : fs_desc['directory_count'],
779 'peers' : []
780 } # type: Dict[str, Any]
781 for peer_uuid, peer_desc in fs_desc['peers'].items():
782 peer = {
783 'uuid' : peer_uuid,
784 'remote' : peer_desc['remote'],
785 'stats' : peer_desc['stats']
786 }
787 fs['peers'].append(peer)
788 daemon['filesystems'].append(fs)
789 daemons.append(daemon)
790 return 0, json.dumps(daemons), ''
791 except MirrorException as me:
792 return me.args[0], '', me.args[1]