]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mirroring/fs/snapshot_mirror.py
be239d4a5580750979183b2ad62d52ae76d5c17b
[ceph.git] / ceph / src / pybind / mgr / mirroring / fs / snapshot_mirror.py
1 import base64
2 import errno
3 import json
4 import logging
5 import os
6 import pickle
7 import re
8 import stat
9 import threading
10 import uuid
11
12 import cephfs
13 import rados
14
15 from mgr_util import RTimer, CephfsClient, open_filesystem,\
16 CephfsConnectionException
17 from .blocklist import blocklist
18 from .notify import Notifier, InstanceWatcher
19 from .utils import INSTANCE_ID_PREFIX, MIRROR_OBJECT_NAME, Finisher, \
20 AsyncOpTracker, connect_to_filesystem, disconnect_from_filesystem
21 from .exception import MirrorException
22 from .dir_map.create import create_mirror_object
23 from .dir_map.load import load_dir_map, load_instances
24 from .dir_map.update import UpdateDirMapRequest, UpdateInstanceRequest
25 from .dir_map.policy import Policy
26 from .dir_map.state_transition import ActionType
27
28 log = logging.getLogger(__name__)
29
30 CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL = 1
31
32 class FSPolicy:
33 class InstanceListener(InstanceWatcher.Listener):
34 def __init__(self, fspolicy):
35 self.fspolicy = fspolicy
36
37 def handle_instances(self, added, removed):
38 self.fspolicy.update_instances(added, removed)
39
40 def __init__(self, mgr, ioctx):
41 self.mgr = mgr
42 self.ioctx = ioctx
43 self.pending = []
44 self.policy = Policy()
45 self.lock = threading.Lock()
46 self.cond = threading.Condition(self.lock)
47 self.dir_paths = []
48 self.async_requests = {}
49 self.finisher = Finisher()
50 self.op_tracker = AsyncOpTracker()
51 self.notifier = Notifier(ioctx)
52 self.instance_listener = FSPolicy.InstanceListener(self)
53 self.instance_watcher = None
54 self.stopping = threading.Event()
55 self.timer_task = RTimer(CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL,
56 self.process_updates)
57 self.timer_task.start()
58
59 def schedule_action(self, dir_paths):
60 self.dir_paths.extend(dir_paths)
61
62 def init(self, dir_mapping, instances):
63 with self.lock:
64 self.policy.init(dir_mapping)
65 # we'll schedule action for all directories, so don't bother capturing
66 # directory names here.
67 self.policy.add_instances(list(instances.keys()), initial_update=True)
68 self.instance_watcher = InstanceWatcher(self.ioctx, instances,
69 self.instance_listener)
70 self.schedule_action(list(dir_mapping.keys()))
71
72 def shutdown(self):
73 with self.lock:
74 log.debug('FSPolicy.shutdown')
75 self.stopping.set()
76 log.debug('canceling update timer task')
77 self.timer_task.cancel()
78 log.debug('update timer task canceled')
79 if self.instance_watcher:
80 log.debug('stopping instance watcher')
81 self.instance_watcher.wait_and_stop()
82 log.debug('stopping instance watcher')
83 self.op_tracker.wait_for_ops()
84 log.debug('FSPolicy.shutdown done')
85
86 def handle_update_mapping(self, updates, removals, request_id, callback, r):
87 log.info(f'handle_update_mapping: {updates} {removals} {request_id} {callback} {r}')
88 with self.lock:
89 try:
90 self.async_requests.pop(request_id)
91 if callback:
92 callback(updates, removals, r)
93 finally:
94 self.op_tracker.finish_async_op()
95
96 def handle_update_instances(self, instances_added, instances_removed, request_id, r):
97 log.info(f'handle_update_instances: {instances_added} {instances_removed} {request_id} {r}')
98 with self.lock:
99 try:
100 self.async_requests.pop(request_id)
101 if self.stopping.is_set():
102 log.debug(f'handle_update_instances: policy shutting down')
103 return
104 schedules = []
105 if instances_removed:
106 schedules.extend(self.policy.remove_instances(instances_removed))
107 if instances_added:
108 schedules.extend(self.policy.add_instances(instances_added))
109 self.schedule_action(schedules)
110 finally:
111 self.op_tracker.finish_async_op()
112
113 def update_mapping(self, update_map, removals, callback=None):
114 log.info(f'updating directory map: {len(update_map)}+{len(removals)} updates')
115 request_id = str(uuid.uuid4())
116 def async_callback(r):
117 self.finisher.queue(self.handle_update_mapping,
118 [list(update_map.keys()), removals, request_id, callback, r])
119 request = UpdateDirMapRequest(self.ioctx, update_map.copy(), removals.copy(), async_callback)
120 self.async_requests[request_id] = request
121 self.op_tracker.start_async_op()
122 log.debug(f'async request_id: {request_id}')
123 request.send()
124
125 def update_instances(self, added, removed):
126 logging.debug(f'update_instances: added={added}, removed={removed}')
127 for instance_id, addr in removed.items():
128 log.info(f'blocklisting instance_id: {instance_id} addr: {addr}')
129 blocklist(self.mgr, addr)
130 with self.lock:
131 instances_added = {}
132 instances_removed = []
133 for instance_id, addr in added.items():
134 instances_added[instance_id] = {'version': 1, 'addr': addr}
135 instances_removed = list(removed.keys())
136 request_id = str(uuid.uuid4())
137 def async_callback(r):
138 self.finisher.queue(self.handle_update_instances,
139 [list(instances_added.keys()), instances_removed, request_id, r])
140 # blacklisted instances can be removed at this point. remapping directories
141 # mapped to blacklisted instances on module startup is handled in policy
142 # add_instances().
143 request = UpdateInstanceRequest(self.ioctx, instances_added.copy(),
144 instances_removed.copy(), async_callback)
145 self.async_requests[request_id] = request
146 log.debug(f'async request_id: {request_id}')
147 self.op_tracker.start_async_op()
148 request.send()
149
150 def continue_action(self, updates, removals, r):
151 log.debug(f'continuing action: {updates}+{removals} r={r}')
152 if self.stopping.is_set():
153 log.debug('continue_action: policy shutting down')
154 return
155 schedules = []
156 for dir_path in updates:
157 schedule = self.policy.finish_action(dir_path, r)
158 if schedule:
159 schedules.append(dir_path)
160 for dir_path in removals:
161 schedule = self.policy.finish_action(dir_path, r)
162 if schedule:
163 schedules.append(dir_path)
164 self.schedule_action(schedules)
165
166 def handle_peer_ack(self, dir_path, r):
167 log.info(f'handle_peer_ack: {dir_path} r={r}')
168 with self.lock:
169 try:
170 if self.stopping.is_set():
171 log.debug(f'handle_peer_ack: policy shutting down')
172 return
173 self.continue_action([dir_path], [], r)
174 finally:
175 self.op_tracker.finish_async_op()
176
177 def process_updates(self):
178 def acquire_message(dir_path):
179 return json.dumps({'dir_path': dir_path,
180 'mode': 'acquire'
181 })
182 def release_message(dir_path):
183 return json.dumps({'dir_path': dir_path,
184 'mode': 'release'
185 })
186 with self.lock:
187 if not self.dir_paths or self.stopping.is_set():
188 return
189 update_map = {}
190 removals = []
191 notifies = {}
192 instance_purges = []
193 for dir_path in self.dir_paths:
194 action_type = self.policy.start_action(dir_path)
195 lookup_info = self.policy.lookup(dir_path)
196 log.debug(f'processing action: dir_path: {dir_path}, lookup_info: {lookup_info}, action_type: {action_type}')
197 if action_type == ActionType.NONE:
198 continue
199 elif action_type == ActionType.MAP_UPDATE:
200 # take care to not overwrite purge status
201 update_map[dir_path] = {'version': 1,
202 'instance_id': lookup_info['instance_id'],
203 'last_shuffled': lookup_info['mapped_time']
204 }
205 if lookup_info['purging']:
206 update_map[dir_path]['purging'] = 1
207 elif action_type == ActionType.MAP_REMOVE:
208 removals.append(dir_path)
209 elif action_type == ActionType.ACQUIRE:
210 notifies[dir_path] = (lookup_info['instance_id'], acquire_message(dir_path))
211 elif action_type == ActionType.RELEASE:
212 notifies[dir_path] = (lookup_info['instance_id'], release_message(dir_path))
213 if update_map or removals:
214 self.update_mapping(update_map, removals, callback=self.continue_action)
215 for dir_path, message in notifies.items():
216 self.op_tracker.start_async_op()
217 self.notifier.notify(dir_path, message, self.handle_peer_ack)
218 self.dir_paths.clear()
219
220 def add_dir(self, dir_path):
221 with self.lock:
222 lookup_info = self.policy.lookup(dir_path)
223 if lookup_info:
224 if lookup_info['purging']:
225 raise MirrorException(-errno.EAGAIN, f'remove in-progress for {dir_path}')
226 else:
227 raise MirrorException(-errno.EEXIST, f'directory {dir_path} is already tracked')
228 schedule = self.policy.add_dir(dir_path)
229 if not schedule:
230 return
231 update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}}
232 updated = False
233 def update_safe(updates, removals, r):
234 nonlocal updated
235 updated = True
236 self.cond.notifyAll()
237 self.update_mapping(update_map, [], callback=update_safe)
238 self.cond.wait_for(lambda: updated)
239 self.schedule_action([dir_path])
240
241 def remove_dir(self, dir_path):
242 with self.lock:
243 lookup_info = self.policy.lookup(dir_path)
244 if not lookup_info:
245 raise MirrorException(-errno.ENOENT, f'directory {dir_path} id not tracked')
246 if lookup_info['purging']:
247 raise MirrorException(-errno.EINVAL, f'directory {dir_path} is under removal')
248 update_map = {dir_path: {'version': 1,
249 'instance_id': lookup_info['instance_id'],
250 'last_shuffled': lookup_info['mapped_time'],
251 'purging': 1}}
252 updated = False
253 sync_lock = threading.Lock()
254 sync_cond = threading.Condition(sync_lock)
255 def update_safe(r):
256 with sync_lock:
257 nonlocal updated
258 updated = True
259 sync_cond.notifyAll()
260 request = UpdateDirMapRequest(self.ioctx, update_map.copy(), [], update_safe)
261 request.send()
262 with sync_lock:
263 sync_cond.wait_for(lambda: updated)
264 schedule = self.policy.remove_dir(dir_path)
265 if schedule:
266 self.schedule_action([dir_path])
267
268 def status(self, dir_path):
269 with self.lock:
270 res = self.policy.dir_status(dir_path)
271 return 0, json.dumps(res, indent=4, sort_keys=True), ''
272
273 def summary(self):
274 with self.lock:
275 res = self.policy.instance_summary()
276 return 0, json.dumps(res, indent=4, sort_keys=True), ''
277
278 class FSSnapshotMirror:
279 PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer"
280
281 def __init__(self, mgr):
282 self.mgr = mgr
283 self.rados = mgr.rados
284 self.pool_policy = {}
285 self.fs_map = self.mgr.get('fs_map')
286 self.lock = threading.Lock()
287 self.refresh_pool_policy()
288 self.local_fs = CephfsClient(mgr)
289
290 def notify(self, notify_type):
291 log.debug(f'got notify type {notify_type}')
292 if notify_type == 'fs_map':
293 with self.lock:
294 self.fs_map = self.mgr.get('fs_map')
295 self.refresh_pool_policy_locked()
296
297 @staticmethod
298 def make_spec(client_name, cluster_name):
299 return f'{client_name}@{cluster_name}'
300
301 @staticmethod
302 def split_spec(spec):
303 try:
304 client_id, cluster_name = spec.split('@')
305 _, client_name = client_id.split('.')
306 return client_name, cluster_name
307 except ValueError:
308 raise MirrorException(-errno.EINVAL, f'invalid cluster spec {spec}')
309
310 @staticmethod
311 def get_metadata_pool(filesystem, fs_map):
312 for fs in fs_map['filesystems']:
313 if fs['mdsmap']['fs_name'] == filesystem:
314 return fs['mdsmap']['metadata_pool']
315 return None
316
317 @staticmethod
318 def get_filesystem_id(filesystem, fs_map):
319 for fs in fs_map['filesystems']:
320 if fs['mdsmap']['fs_name'] == filesystem:
321 return fs['id']
322 return None
323
324 @staticmethod
325 def peer_config_key(filesystem, peer_uuid):
326 return f'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}'
327
328 def config_set(self, key, val=None):
329 """set or remove a key from mon config store"""
330 if val:
331 cmd = {'prefix': 'config-key set',
332 'key': key, 'val': val}
333 else:
334 cmd = {'prefix': 'config-key rm',
335 'key': key}
336 r, outs, err = self.mgr.mon_command(cmd)
337 if r < 0:
338 log.error(f'mon command to set/remove config-key {key} failed: {err}')
339 raise Exception(-errno.EINVAL)
340
341 def config_get(self, key):
342 """fetch a config key value from mon config store"""
343 cmd = {'prefix': 'config-key get', 'key': key}
344 r, outs, err = self.mgr.mon_command(cmd)
345 if r < 0 and not r == -errno.ENOENT:
346 log.error(f'mon command to get config-key {key} failed: {err}')
347 raise Exception(-errno.EINVAL)
348 val = {}
349 if r == 0:
350 val = json.loads(outs)
351 return val
352
353 def filesystem_exist(self, filesystem):
354 for fs in self.fs_map['filesystems']:
355 if fs['mdsmap']['fs_name'] == filesystem:
356 return True
357 return False
358
359 def get_mirrored_filesystems(self):
360 return [fs['mdsmap']['fs_name'] for fs in self.fs_map['filesystems'] if fs.get('mirror_info', None)]
361
362 def get_filesystem_peers(self, filesystem):
363 """To be used when mirroring in enabled for the filesystem"""
364 for fs in self.fs_map['filesystems']:
365 if fs['mdsmap']['fs_name'] == filesystem:
366 return fs['mirror_info']['peers']
367 return None
368
369 def peer_exists(self, filesystem, remote_cluster_spec, remote_fs_name):
370 peers = self.get_filesystem_peers(filesystem)
371 for _, rem in peers.items():
372 remote = rem['remote']
373 spec = FSSnapshotMirror.make_spec(remote['client_name'], remote['cluster_name'])
374 if spec == remote_cluster_spec and remote['fs_name'] == remote_fs_name:
375 return True
376 return False
377
378 def get_mirror_info(self, remote_fs):
379 try:
380 val = remote_fs.getxattr('/', 'ceph.mirror.info')
381 match = re.search(r'^cluster_id=([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}) fs_id=(\d+)$',
382 val.decode('utf-8'))
383 if match and len(match.groups()) == 2:
384 return {'cluster_id': match.group(1),
385 'fs_id': int(match.group(2))
386 }
387 return None
388 except cephfs.Error as e:
389 return None
390
391 def set_mirror_info(self, local_cluster_id, local_fsid, remote_fs):
392 log.info(f'setting {local_cluster_id}::{local_fsid} on remote')
393 try:
394 remote_fs.setxattr('/', 'ceph.mirror.info',
395 f'cluster_id={local_cluster_id} fs_id={local_fsid}'.encode('utf-8'), os.XATTR_CREATE)
396 except cephfs.Error as e:
397 if e.errno == errno.EEXIST:
398 mi = self.get_mirror_info(remote_fs)
399 if not mi:
400 log.error(f'error fetching mirror info when setting mirror info')
401 raise Exception(-errno.EINVAL)
402 cluster_id = mi['cluster_id']
403 fs_id = mi['fs_id']
404 if not (cluster_id == local_cluster_id and fs_id == local_fsid):
405 raise MirrorException(-errno.EEXIST, f'peer mirrorred by: (cluster_id: {cluster_id}, fs_id: {fs_id})')
406 else:
407 log.error(f'error setting mirrored fsid: {e}')
408 raise Exception(-e.errno)
409
410 def resolve_peer(self, fs_name, peer_uuid):
411 peers = self.get_filesystem_peers(fs_name)
412 for peer, rem in peers.items():
413 if peer == peer_uuid:
414 return rem['remote']
415 return None
416
417 def purge_mirror_info(self, local_fs_name, peer_uuid):
418 log.debug(f'local fs={local_fs_name} peer_uuid={peer_uuid}')
419 # resolve the peer to its spec
420 rem = self.resolve_peer(local_fs_name, peer_uuid)
421 if not rem:
422 return
423 log.debug(f'peer_uuid={peer_uuid} resolved to {rem}')
424 _, client_name = rem['client_name'].split('.')
425
426 # fetch auth details from config store
427 remote_conf = self.config_get(FSSnapshotMirror.peer_config_key(local_fs_name, peer_uuid))
428 remote_cluster, remote_fs = connect_to_filesystem(client_name,
429 rem['cluster_name'],
430 rem['fs_name'], 'remote', conf_dct=remote_conf)
431 try:
432 remote_fs.removexattr('/', 'ceph.mirror.info')
433 except cephfs.Error as e:
434 if not e.errno == errno.ENOENT:
435 log.error('error removing mirror info')
436 raise Exception(-e.errno)
437 finally:
438 disconnect_from_filesystem(rem['cluster_name'], rem['fs_name'], remote_cluster, remote_fs)
439
440 def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name, remote_conf={}):
441 log.debug(f'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}')
442
443 client_name, cluster_name = FSSnapshotMirror.split_spec(remote_cluster_spec)
444 remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name, remote_fs_name,
445 'remote', conf_dct=remote_conf)
446 if 'fsid' in remote_conf:
447 if not remote_cluster.get_fsid() == remote_conf['fsid']:
448 raise MirrorException(-errno.EINVAL, 'FSID mismatch between bootstrap token and remote cluster')
449
450 local_fsid = FSSnapshotMirror.get_filesystem_id(local_fs_name, self.fs_map)
451 if local_fsid is None:
452 log.error(f'error looking up filesystem id for {local_fs_name}')
453 raise Exception(-errno.EINVAL)
454
455 # post cluster id comparison, filesystem name comparison would suffice
456 local_cluster_id = self.rados.get_fsid()
457 remote_cluster_id = remote_cluster.get_fsid()
458 log.debug(f'local_cluster_id={local_cluster_id} remote_cluster_id={remote_cluster_id}')
459 if local_cluster_id == remote_cluster_id and local_fs_name == remote_fs_name:
460 raise MirrorException(-errno.EINVAL, "'Source and destination cluster fsid and "\
461 "file-system name can't be the same")
462
463 try:
464 self.set_mirror_info(local_cluster_id, local_fsid, remote_fs)
465 finally:
466 disconnect_from_filesystem(cluster_name, remote_fs_name, remote_cluster, remote_fs)
467
468 def init_pool_policy(self, filesystem):
469 metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map)
470 if not metadata_pool_id:
471 log.error(f'cannot find metadata pool-id for filesystem {filesystem}')
472 raise Exception(-errno.EINVAL)
473 try:
474 ioctx = self.rados.open_ioctx2(metadata_pool_id)
475 # TODO: make async if required
476 dir_mapping = load_dir_map(ioctx)
477 instances = load_instances(ioctx)
478 # init policy
479 fspolicy = FSPolicy(self.mgr, ioctx)
480 log.debug(f'init policy for filesystem {filesystem}: pool-id {metadata_pool_id}')
481 fspolicy.init(dir_mapping, instances)
482 self.pool_policy[filesystem] = fspolicy
483 except rados.Error as e:
484 log.error(f'failed to access pool-id {metadata_pool_id} for filesystem {filesystem}: {e}')
485 raise Exception(-e.errno)
486
487 def refresh_pool_policy_locked(self):
488 filesystems = self.get_mirrored_filesystems()
489 log.debug(f'refreshing policy for {filesystems}')
490 for filesystem in list(self.pool_policy):
491 if not filesystem in filesystems:
492 log.info(f'shutdown pool policy for {filesystem}')
493 fspolicy = self.pool_policy.pop(filesystem)
494 fspolicy.shutdown()
495 for filesystem in filesystems:
496 if not filesystem in self.pool_policy:
497 log.info(f'init pool policy for {filesystem}')
498 self.init_pool_policy(filesystem)
499
500 def refresh_pool_policy(self):
501 with self.lock:
502 self.refresh_pool_policy_locked()
503
504 def enable_mirror(self, filesystem):
505 log.info(f'enabling mirror for filesystem {filesystem}')
506 with self.lock:
507 try:
508 metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map)
509 if not metadata_pool_id:
510 log.error(f'cannot find metadata pool-id for filesystem {filesystem}')
511 raise Exception(-errno.EINVAL)
512 create_mirror_object(self.rados, metadata_pool_id)
513 cmd = {'prefix': 'fs mirror enable', 'fs_name': filesystem}
514 r, outs, err = self.mgr.mon_command(cmd)
515 if r < 0:
516 log.error(f'mon command to enable mirror failed: {err}')
517 raise Exception(-errno.EINVAL)
518 return 0, json.dumps({}), ''
519 except MirrorException as me:
520 return me.args[0], '', me.args[1]
521 except Exception as me:
522 return me.args[0], '', 'failed to enable mirroring'
523
524 def disable_mirror(self, filesystem):
525 log.info(f'disabling mirror for filesystem {filesystem}')
526 try:
527 with self.lock:
528 cmd = {'prefix': 'fs mirror disable', 'fs_name': filesystem}
529 r, outs, err = self.mgr.mon_command(cmd)
530 if r < 0:
531 log.error(f'mon command to disable mirror failed: {err}')
532 raise Exception(-errno.EINVAL)
533 return 0, json.dumps({}), ''
534 except MirrorException as me:
535 return me.args[0], '', me.args[1]
536 except Exception as e:
537 return e.args[0], '', 'failed to disable mirroring'
538
539 def peer_list(self, filesystem):
540 try:
541 with self.lock:
542 fspolicy = self.pool_policy.get(filesystem, None)
543 if not fspolicy:
544 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
545 peers = self.get_filesystem_peers(filesystem)
546 peer_res = {}
547 for peer_uuid, rem in peers.items():
548 conf = self.config_get(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid))
549 remote = rem['remote']
550 peer_res[peer_uuid] = {'client_name': remote['client_name'],
551 'site_name': remote['cluster_name'],
552 'fs_name': remote['fs_name']
553 }
554 if 'mon_host' in conf:
555 peer_res[peer_uuid]['mon_host'] = conf['mon_host']
556 return 0, json.dumps(peer_res), ''
557 except MirrorException as me:
558 return me.args[0], '', me.args[1]
559 except Exception as e:
560 return e.args[0], '', 'failed to list peers'
561
562 def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name, remote_conf):
563 try:
564 if remote_fs_name == None:
565 remote_fs_name = filesystem
566 with self.lock:
567 fspolicy = self.pool_policy.get(filesystem, None)
568 if not fspolicy:
569 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
570 ### peer updates for key, site-name are not yet supported
571 if self.peer_exists(filesystem, remote_cluster_spec, remote_fs_name):
572 return 0, json.dumps({}), ''
573 # _own_ the peer
574 self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name, remote_conf)
575 # unique peer uuid
576 peer_uuid = str(uuid.uuid4())
577 config_key = FSSnapshotMirror.peer_config_key(filesystem, peer_uuid)
578 if remote_conf.get('mon_host') and remote_conf.get('key'):
579 self.config_set(config_key, json.dumps(remote_conf))
580 cmd = {'prefix': 'fs mirror peer_add',
581 'fs_name': filesystem,
582 'uuid': peer_uuid,
583 'remote_cluster_spec': remote_cluster_spec,
584 'remote_fs_name': remote_fs_name}
585 r, outs, err = self.mgr.mon_command(cmd)
586 if r < 0:
587 log.error(f'mon command to add peer failed: {err}')
588 try:
589 log.debug(f'cleaning up config-key for {peer_uuid}')
590 self.config_set(config_key)
591 except:
592 pass
593 raise Exception(-errno.EINVAL)
594 return 0, json.dumps({}), ''
595 except MirrorException as me:
596 return me.args[0], '', me.args[1]
597 except Exception as e:
598 return e.args[0], '', 'failed to add peer'
599
600 def peer_remove(self, filesystem, peer_uuid):
601 try:
602 with self.lock:
603 fspolicy = self.pool_policy.get(filesystem, None)
604 if not fspolicy:
605 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
606 # ok, this is being a bit lazy. remove mirror info from peer followed
607 # by purging the peer from fsmap. if the mirror daemon fs map updates
608 # are laggy, they happily continue to synchronize. ideally, we should
609 # purge the peer from fsmap here and purge mirror info on fsmap update
610 # (in notify()). but thats not straightforward -- before purging mirror
611 # info, we would need to wait for all mirror daemons to catch up with
612 # fsmap updates. this involves mirror daemons sending the fsmap epoch
613 # they have seen in reply to a notify request. TODO: fix this.
614 self.purge_mirror_info(filesystem, peer_uuid)
615 cmd = {'prefix': 'fs mirror peer_remove',
616 'fs_name': filesystem,
617 'uuid': peer_uuid}
618 r, outs, err = self.mgr.mon_command(cmd)
619 if r < 0:
620 log.error(f'mon command to remove peer failed: {err}')
621 raise Exception(-errno.EINVAL)
622 self.config_set(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid))
623 return 0, json.dumps({}), ''
624 except MirrorException as me:
625 return me.args[0], '', me.args[1]
626 except Exception as e:
627 return e.args[0], '', 'failed to remove peer'
628
629 def peer_bootstrap_create(self, fs_name, client_name, site_name):
630 """create a bootstrap token for this peer filesystem"""
631 try:
632 with self.lock:
633 cmd = {'prefix': 'fs authorize',
634 'filesystem': fs_name,
635 'entity': client_name,
636 'caps': ['/', 'rwps']}
637 r, outs, err = self.mgr.mon_command(cmd)
638 if r < 0:
639 log.error(f'mon command to create peer user failed: {err}')
640 raise Exception(-errno.EINVAL)
641 cmd = {'prefix': 'auth get',
642 'entity': client_name,
643 'format': 'json'}
644 r, outs, err = self.mgr.mon_command(cmd)
645 if r < 0:
646 log.error(f'mon command to fetch keyring failed: {err}')
647 raise Exception(-errno.EINVAL)
648 outs = json.loads(outs)
649 outs0 = outs[0]
650 token_dct = {'fsid': self.mgr.rados.get_fsid(),
651 'filesystem': fs_name,
652 'user': outs0['entity'],
653 'site_name': site_name,
654 'key': outs0['key'],
655 'mon_host': self.mgr.rados.conf_get('mon_host')}
656 token_str = json.dumps(token_dct).encode('utf-8')
657 encoded_token = base64.b64encode(token_str)
658 return 0, json.dumps({'token': encoded_token.decode('utf-8')}), ''
659 except MirrorException as me:
660 return me.args[0], '', me.args[1]
661 except Exception as e:
662 return e.args[0], '', 'failed to bootstrap peer'
663
664 def peer_bootstrap_import(self, filesystem, token):
665 try:
666 token_str = base64.b64decode(token)
667 token_dct = json.loads(token_str.decode('utf-8'))
668 except:
669 return -errno.EINVAL, '', 'failed to parse token'
670 client_name = token_dct.pop('user')
671 cluster_name = token_dct.pop('site_name')
672 remote_fs_name = token_dct.pop('filesystem')
673 remote_cluster_spec = f'{client_name}@{cluster_name}'
674 return self.peer_add(filesystem, remote_cluster_spec, remote_fs_name, token_dct)
675
676 @staticmethod
677 def norm_path(dir_path):
678 if not os.path.isabs(dir_path):
679 raise MirrorException(-errno.EINVAL, f'{dir_path} should be an absolute path')
680 return os.path.normpath(dir_path)
681
682 def add_dir(self, filesystem, dir_path):
683 try:
684 with self.lock:
685 if not self.filesystem_exist(filesystem):
686 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
687 fspolicy = self.pool_policy.get(filesystem, None)
688 if not fspolicy:
689 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
690 dir_path = FSSnapshotMirror.norm_path(dir_path)
691 log.debug(f'path normalized to {dir_path}')
692 fspolicy.add_dir(dir_path)
693 return 0, json.dumps({}), ''
694 except MirrorException as me:
695 return me.args[0], '', me.args[1]
696 except Exception as e:
697 return e.args[0], '', 'failed to add directory'
698
699 def remove_dir(self, filesystem, dir_path):
700 try:
701 with self.lock:
702 if not self.filesystem_exist(filesystem):
703 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
704 fspolicy = self.pool_policy.get(filesystem, None)
705 if not fspolicy:
706 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
707 dir_path = FSSnapshotMirror.norm_path(dir_path)
708 fspolicy.remove_dir(dir_path)
709 return 0, json.dumps({}), ''
710 except MirrorException as me:
711 return me.args[0], '', me.args[1]
712 except Exception as e:
713 return e.args[0], '', 'failed to remove directory'
714
715 def status(self,filesystem, dir_path):
716 try:
717 with self.lock:
718 if not self.filesystem_exist(filesystem):
719 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
720 fspolicy = self.pool_policy.get(filesystem, None)
721 if not fspolicy:
722 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
723 dir_path = FSSnapshotMirror.norm_path(dir_path)
724 return fspolicy.status(dir_path)
725 except MirrorException as me:
726 return me.args[0], '', me.args[1]
727
728 def show_distribution(self, filesystem):
729 try:
730 with self.lock:
731 if not self.filesystem_exist(filesystem):
732 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
733 fspolicy = self.pool_policy.get(filesystem, None)
734 if not fspolicy:
735 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
736 return fspolicy.summary()
737 except MirrorException as me:
738 return me.args[0], '', me.args[1]
739
740 def daemon_status(self, filesystem):
741 try:
742 with self.lock:
743 if not self.filesystem_exist(filesystem):
744 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
745 fspolicy = self.pool_policy.get(filesystem, None)
746 if not fspolicy:
747 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
748 daemons = {}
749 sm = self.mgr.get('service_map')
750 daemon_entry = sm['services'].get('cephfs-mirror', None)
751 if daemon_entry:
752 for daemon_key in daemon_entry['daemons']:
753 try:
754 daemon_id = int(daemon_key)
755 daemon_status = self.mgr.get_daemon_status('cephfs-mirror', daemon_key)
756 if not daemon_status:
757 # temporary, should get updated soon
758 log.debug(f'daemon status not yet availble for daemon_id {daemon_id}')
759 continue
760 try:
761 daemons[daemon_id] = json.loads(daemon_status['status_json'])
762 except KeyError:
763 # temporary, should get updated soon
764 log.debug(f'daemon status not yet available for daemon_id {daemon_id}')
765 except ValueError:
766 pass
767 return 0, json.dumps(daemons), ''
768 except MirrorException as me:
769 return me.args[0], '', me.args[1]