]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mirroring/fs/snapshot_mirror.py
import ceph 16.2.6
[ceph.git] / ceph / src / pybind / mgr / mirroring / fs / snapshot_mirror.py
1 import base64
2 import errno
3 import json
4 import logging
5 import os
6 import pickle
7 import re
8 import stat
9 import threading
10 import uuid
11 from typing import Dict, Any
12
13 import cephfs
14 import rados
15
16 from mgr_util import RTimer, CephfsClient, open_filesystem,\
17 CephfsConnectionException
18 from .blocklist import blocklist
19 from .notify import Notifier, InstanceWatcher
20 from .utils import INSTANCE_ID_PREFIX, MIRROR_OBJECT_NAME, Finisher, \
21 AsyncOpTracker, connect_to_filesystem, disconnect_from_filesystem
22 from .exception import MirrorException
23 from .dir_map.create import create_mirror_object
24 from .dir_map.load import load_dir_map, load_instances
25 from .dir_map.update import UpdateDirMapRequest, UpdateInstanceRequest
26 from .dir_map.policy import Policy
27 from .dir_map.state_transition import ActionType
28
29 log = logging.getLogger(__name__)
30
31 CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL = 1
32
33 class FSPolicy:
34 class InstanceListener(InstanceWatcher.Listener):
35 def __init__(self, fspolicy):
36 self.fspolicy = fspolicy
37
38 def handle_instances(self, added, removed):
39 self.fspolicy.update_instances(added, removed)
40
41 def __init__(self, mgr, ioctx):
42 self.mgr = mgr
43 self.ioctx = ioctx
44 self.pending = []
45 self.policy = Policy()
46 self.lock = threading.Lock()
47 self.cond = threading.Condition(self.lock)
48 self.dir_paths = []
49 self.async_requests = {}
50 self.finisher = Finisher()
51 self.op_tracker = AsyncOpTracker()
52 self.notifier = Notifier(ioctx)
53 self.instance_listener = FSPolicy.InstanceListener(self)
54 self.instance_watcher = None
55 self.stopping = threading.Event()
56 self.timer_task = RTimer(CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL,
57 self.process_updates)
58 self.timer_task.start()
59
60 def schedule_action(self, dir_paths):
61 self.dir_paths.extend(dir_paths)
62
63 def init(self, dir_mapping, instances):
64 with self.lock:
65 self.policy.init(dir_mapping)
66 # we'll schedule action for all directories, so don't bother capturing
67 # directory names here.
68 self.policy.add_instances(list(instances.keys()), initial_update=True)
69 self.instance_watcher = InstanceWatcher(self.ioctx, instances,
70 self.instance_listener)
71 self.schedule_action(list(dir_mapping.keys()))
72
73 def shutdown(self):
74 with self.lock:
75 log.debug('FSPolicy.shutdown')
76 self.stopping.set()
77 log.debug('canceling update timer task')
78 self.timer_task.cancel()
79 log.debug('update timer task canceled')
80 if self.instance_watcher:
81 log.debug('stopping instance watcher')
82 self.instance_watcher.wait_and_stop()
83 log.debug('stopping instance watcher')
84 self.op_tracker.wait_for_ops()
85 log.debug('FSPolicy.shutdown done')
86
87 def handle_update_mapping(self, updates, removals, request_id, callback, r):
88 log.info(f'handle_update_mapping: {updates} {removals} {request_id} {callback} {r}')
89 with self.lock:
90 try:
91 self.async_requests.pop(request_id)
92 if callback:
93 callback(updates, removals, r)
94 finally:
95 self.op_tracker.finish_async_op()
96
97 def handle_update_instances(self, instances_added, instances_removed, request_id, r):
98 log.info(f'handle_update_instances: {instances_added} {instances_removed} {request_id} {r}')
99 with self.lock:
100 try:
101 self.async_requests.pop(request_id)
102 if self.stopping.is_set():
103 log.debug(f'handle_update_instances: policy shutting down')
104 return
105 schedules = []
106 if instances_removed:
107 schedules.extend(self.policy.remove_instances(instances_removed))
108 if instances_added:
109 schedules.extend(self.policy.add_instances(instances_added))
110 self.schedule_action(schedules)
111 finally:
112 self.op_tracker.finish_async_op()
113
114 def update_mapping(self, update_map, removals, callback=None):
115 log.info(f'updating directory map: {len(update_map)}+{len(removals)} updates')
116 request_id = str(uuid.uuid4())
117 def async_callback(r):
118 self.finisher.queue(self.handle_update_mapping,
119 [list(update_map.keys()), removals, request_id, callback, r])
120 request = UpdateDirMapRequest(self.ioctx, update_map.copy(), removals.copy(), async_callback)
121 self.async_requests[request_id] = request
122 self.op_tracker.start_async_op()
123 log.debug(f'async request_id: {request_id}')
124 request.send()
125
126 def update_instances(self, added, removed):
127 logging.debug(f'update_instances: added={added}, removed={removed}')
128 for instance_id, addr in removed.items():
129 log.info(f'blocklisting instance_id: {instance_id} addr: {addr}')
130 blocklist(self.mgr, addr)
131 with self.lock:
132 instances_added = {}
133 instances_removed = []
134 for instance_id, addr in added.items():
135 instances_added[instance_id] = {'version': 1, 'addr': addr}
136 instances_removed = list(removed.keys())
137 request_id = str(uuid.uuid4())
138 def async_callback(r):
139 self.finisher.queue(self.handle_update_instances,
140 [list(instances_added.keys()), instances_removed, request_id, r])
141 # blacklisted instances can be removed at this point. remapping directories
142 # mapped to blacklisted instances on module startup is handled in policy
143 # add_instances().
144 request = UpdateInstanceRequest(self.ioctx, instances_added.copy(),
145 instances_removed.copy(), async_callback)
146 self.async_requests[request_id] = request
147 log.debug(f'async request_id: {request_id}')
148 self.op_tracker.start_async_op()
149 request.send()
150
151 def continue_action(self, updates, removals, r):
152 log.debug(f'continuing action: {updates}+{removals} r={r}')
153 if self.stopping.is_set():
154 log.debug('continue_action: policy shutting down')
155 return
156 schedules = []
157 for dir_path in updates:
158 schedule = self.policy.finish_action(dir_path, r)
159 if schedule:
160 schedules.append(dir_path)
161 for dir_path in removals:
162 schedule = self.policy.finish_action(dir_path, r)
163 if schedule:
164 schedules.append(dir_path)
165 self.schedule_action(schedules)
166
167 def handle_peer_ack(self, dir_path, r):
168 log.info(f'handle_peer_ack: {dir_path} r={r}')
169 with self.lock:
170 try:
171 if self.stopping.is_set():
172 log.debug(f'handle_peer_ack: policy shutting down')
173 return
174 self.continue_action([dir_path], [], r)
175 finally:
176 self.op_tracker.finish_async_op()
177
178 def process_updates(self):
179 def acquire_message(dir_path):
180 return json.dumps({'dir_path': dir_path,
181 'mode': 'acquire'
182 })
183 def release_message(dir_path):
184 return json.dumps({'dir_path': dir_path,
185 'mode': 'release'
186 })
187 with self.lock:
188 if not self.dir_paths or self.stopping.is_set():
189 return
190 update_map = {}
191 removals = []
192 notifies = {}
193 instance_purges = []
194 for dir_path in self.dir_paths:
195 action_type = self.policy.start_action(dir_path)
196 lookup_info = self.policy.lookup(dir_path)
197 log.debug(f'processing action: dir_path: {dir_path}, lookup_info: {lookup_info}, action_type: {action_type}')
198 if action_type == ActionType.NONE:
199 continue
200 elif action_type == ActionType.MAP_UPDATE:
201 # take care to not overwrite purge status
202 update_map[dir_path] = {'version': 1,
203 'instance_id': lookup_info['instance_id'],
204 'last_shuffled': lookup_info['mapped_time']
205 }
206 if lookup_info['purging']:
207 update_map[dir_path]['purging'] = 1
208 elif action_type == ActionType.MAP_REMOVE:
209 removals.append(dir_path)
210 elif action_type == ActionType.ACQUIRE:
211 notifies[dir_path] = (lookup_info['instance_id'], acquire_message(dir_path))
212 elif action_type == ActionType.RELEASE:
213 notifies[dir_path] = (lookup_info['instance_id'], release_message(dir_path))
214 if update_map or removals:
215 self.update_mapping(update_map, removals, callback=self.continue_action)
216 for dir_path, message in notifies.items():
217 self.op_tracker.start_async_op()
218 self.notifier.notify(dir_path, message, self.handle_peer_ack)
219 self.dir_paths.clear()
220
221 def add_dir(self, dir_path):
222 with self.lock:
223 lookup_info = self.policy.lookup(dir_path)
224 if lookup_info:
225 if lookup_info['purging']:
226 raise MirrorException(-errno.EAGAIN, f'remove in-progress for {dir_path}')
227 else:
228 raise MirrorException(-errno.EEXIST, f'directory {dir_path} is already tracked')
229 schedule = self.policy.add_dir(dir_path)
230 if not schedule:
231 return
232 update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}}
233 updated = False
234 def update_safe(updates, removals, r):
235 nonlocal updated
236 updated = True
237 self.cond.notifyAll()
238 self.update_mapping(update_map, [], callback=update_safe)
239 self.cond.wait_for(lambda: updated)
240 self.schedule_action([dir_path])
241
242 def remove_dir(self, dir_path):
243 with self.lock:
244 lookup_info = self.policy.lookup(dir_path)
245 if not lookup_info:
246 raise MirrorException(-errno.ENOENT, f'directory {dir_path} id not tracked')
247 if lookup_info['purging']:
248 raise MirrorException(-errno.EINVAL, f'directory {dir_path} is under removal')
249 update_map = {dir_path: {'version': 1,
250 'instance_id': lookup_info['instance_id'],
251 'last_shuffled': lookup_info['mapped_time'],
252 'purging': 1}}
253 updated = False
254 sync_lock = threading.Lock()
255 sync_cond = threading.Condition(sync_lock)
256 def update_safe(r):
257 with sync_lock:
258 nonlocal updated
259 updated = True
260 sync_cond.notifyAll()
261 request = UpdateDirMapRequest(self.ioctx, update_map.copy(), [], update_safe)
262 request.send()
263 with sync_lock:
264 sync_cond.wait_for(lambda: updated)
265 schedule = self.policy.remove_dir(dir_path)
266 if schedule:
267 self.schedule_action([dir_path])
268
269 def status(self, dir_path):
270 with self.lock:
271 res = self.policy.dir_status(dir_path)
272 return 0, json.dumps(res, indent=4, sort_keys=True), ''
273
274 def summary(self):
275 with self.lock:
276 res = self.policy.instance_summary()
277 return 0, json.dumps(res, indent=4, sort_keys=True), ''
278
279 class FSSnapshotMirror:
280 PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer"
281
282 def __init__(self, mgr):
283 self.mgr = mgr
284 self.rados = mgr.rados
285 self.pool_policy = {}
286 self.fs_map = self.mgr.get('fs_map')
287 self.lock = threading.Lock()
288 self.refresh_pool_policy()
289 self.local_fs = CephfsClient(mgr)
290
291 def notify(self, notify_type):
292 log.debug(f'got notify type {notify_type}')
293 if notify_type == 'fs_map':
294 with self.lock:
295 self.fs_map = self.mgr.get('fs_map')
296 self.refresh_pool_policy_locked()
297
298 @staticmethod
299 def make_spec(client_name, cluster_name):
300 return f'{client_name}@{cluster_name}'
301
302 @staticmethod
303 def split_spec(spec):
304 try:
305 client_id, cluster_name = spec.split('@')
306 _, client_name = client_id.split('.')
307 return client_name, cluster_name
308 except ValueError:
309 raise MirrorException(-errno.EINVAL, f'invalid cluster spec {spec}')
310
311 @staticmethod
312 def get_metadata_pool(filesystem, fs_map):
313 for fs in fs_map['filesystems']:
314 if fs['mdsmap']['fs_name'] == filesystem:
315 return fs['mdsmap']['metadata_pool']
316 return None
317
318 @staticmethod
319 def get_filesystem_id(filesystem, fs_map):
320 for fs in fs_map['filesystems']:
321 if fs['mdsmap']['fs_name'] == filesystem:
322 return fs['id']
323 return None
324
325 @staticmethod
326 def peer_config_key(filesystem, peer_uuid):
327 return f'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}'
328
329 def config_set(self, key, val=None):
330 """set or remove a key from mon config store"""
331 if val:
332 cmd = {'prefix': 'config-key set',
333 'key': key, 'val': val}
334 else:
335 cmd = {'prefix': 'config-key rm',
336 'key': key}
337 r, outs, err = self.mgr.mon_command(cmd)
338 if r < 0:
339 log.error(f'mon command to set/remove config-key {key} failed: {err}')
340 raise Exception(-errno.EINVAL)
341
342 def config_get(self, key):
343 """fetch a config key value from mon config store"""
344 cmd = {'prefix': 'config-key get', 'key': key}
345 r, outs, err = self.mgr.mon_command(cmd)
346 if r < 0 and not r == -errno.ENOENT:
347 log.error(f'mon command to get config-key {key} failed: {err}')
348 raise Exception(-errno.EINVAL)
349 val = {}
350 if r == 0:
351 val = json.loads(outs)
352 return val
353
354 def filesystem_exist(self, filesystem):
355 for fs in self.fs_map['filesystems']:
356 if fs['mdsmap']['fs_name'] == filesystem:
357 return True
358 return False
359
360 def get_mirrored_filesystems(self):
361 return [fs['mdsmap']['fs_name'] for fs in self.fs_map['filesystems'] if fs.get('mirror_info', None)]
362
363 def get_filesystem_peers(self, filesystem):
364 """To be used when mirroring in enabled for the filesystem"""
365 for fs in self.fs_map['filesystems']:
366 if fs['mdsmap']['fs_name'] == filesystem:
367 return fs['mirror_info']['peers']
368 return None
369
370 def peer_exists(self, filesystem, remote_cluster_spec, remote_fs_name):
371 peers = self.get_filesystem_peers(filesystem)
372 for _, rem in peers.items():
373 remote = rem['remote']
374 spec = FSSnapshotMirror.make_spec(remote['client_name'], remote['cluster_name'])
375 if spec == remote_cluster_spec and remote['fs_name'] == remote_fs_name:
376 return True
377 return False
378
379 @staticmethod
380 def get_mirror_info(fs):
381 try:
382 val = fs.getxattr('/', 'ceph.mirror.info')
383 match = re.search(r'^cluster_id=([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}) fs_id=(\d+)$',
384 val.decode('utf-8'))
385 if match and len(match.groups()) == 2:
386 return {'cluster_id': match.group(1),
387 'fs_id': int(match.group(2))
388 }
389 raise MirrorException(-errno.EINVAL, 'invalid ceph.mirror.info value format')
390 except cephfs.Error as e:
391 raise MirrorException(-e.errno, 'error fetching ceph.mirror.info xattr')
392
393 @staticmethod
394 def set_mirror_info(local_cluster_id, local_fsid, remote_fs):
395 log.info(f'setting {local_cluster_id}::{local_fsid} on remote')
396 try:
397 remote_fs.setxattr('/', 'ceph.mirror.info',
398 f'cluster_id={local_cluster_id} fs_id={local_fsid}'.encode('utf-8'), os.XATTR_CREATE)
399 except cephfs.Error as e:
400 if e.errno == errno.EEXIST:
401 try:
402 mi = FSSnapshotMirror.get_mirror_info(remote_fs)
403 cluster_id = mi['cluster_id']
404 fs_id = mi['fs_id']
405 if not (cluster_id == local_cluster_id and fs_id == local_fsid):
406 raise MirrorException(-errno.EEXIST, f'peer mirrorred by: (cluster_id: {cluster_id}, fs_id: {fs_id})')
407 except MirrorException:
408 # if mirror info cannot be fetched for some reason, let's just
409 # fail.
410 raise MirrorException(-errno.EEXIST, f'already an active peer')
411 else:
412 log.error(f'error setting mirrored fsid: {e}')
413 raise Exception(-e.errno)
414
415 def resolve_peer(self, fs_name, peer_uuid):
416 peers = self.get_filesystem_peers(fs_name)
417 for peer, rem in peers.items():
418 if peer == peer_uuid:
419 return rem['remote']
420 return None
421
422 def purge_mirror_info(self, local_fs_name, peer_uuid):
423 log.debug(f'local fs={local_fs_name} peer_uuid={peer_uuid}')
424 # resolve the peer to its spec
425 rem = self.resolve_peer(local_fs_name, peer_uuid)
426 if not rem:
427 return
428 log.debug(f'peer_uuid={peer_uuid} resolved to {rem}')
429 _, client_name = rem['client_name'].split('.')
430
431 # fetch auth details from config store
432 remote_conf = self.config_get(FSSnapshotMirror.peer_config_key(local_fs_name, peer_uuid))
433 remote_cluster, remote_fs = connect_to_filesystem(client_name,
434 rem['cluster_name'],
435 rem['fs_name'], 'remote', conf_dct=remote_conf)
436 try:
437 remote_fs.removexattr('/', 'ceph.mirror.info')
438 except cephfs.Error as e:
439 if not e.errno == errno.ENOENT:
440 log.error('error removing mirror info')
441 raise Exception(-e.errno)
442 finally:
443 disconnect_from_filesystem(rem['cluster_name'], rem['fs_name'], remote_cluster, remote_fs)
444
445 def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name, remote_conf={}):
446 log.debug(f'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}')
447
448 client_name, cluster_name = FSSnapshotMirror.split_spec(remote_cluster_spec)
449 remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name, remote_fs_name,
450 'remote', conf_dct=remote_conf)
451 try:
452 local_cluster_id = self.rados.get_fsid()
453 remote_cluster_id = remote_cluster.get_fsid()
454 log.debug(f'local_cluster_id={local_cluster_id} remote_cluster_id={remote_cluster_id}')
455 if 'fsid' in remote_conf:
456 if not remote_cluster_id == remote_conf['fsid']:
457 raise MirrorException(-errno.EINVAL, 'FSID mismatch between bootstrap token and remote cluster')
458
459 local_fscid = remote_fscid = None
460 with open_filesystem(self.local_fs, local_fs_name) as local_fsh:
461 local_fscid = local_fsh.get_fscid()
462 remote_fscid = remote_fs.get_fscid()
463 log.debug(f'local_fscid={local_fscid} remote_fscid={remote_fscid}')
464 mi = None
465 try:
466 mi = FSSnapshotMirror.get_mirror_info(local_fsh)
467 except MirrorException as me:
468 if me.args[0] != -errno.ENODATA:
469 raise Exception(-errno.EINVAL)
470 if mi and mi['cluster_id'] == remote_cluster_id and mi['fs_id'] == remote_fscid:
471 raise MirrorException(-errno.EINVAL, f'file system is an active peer for file system: {remote_fs_name}')
472
473 if local_cluster_id == remote_cluster_id and local_fscid == remote_fscid:
474 raise MirrorException(-errno.EINVAL, "'Source and destination cluster fsid and "\
475 "file-system name can't be the same")
476 FSSnapshotMirror.set_mirror_info(local_cluster_id, local_fscid, remote_fs)
477 finally:
478 disconnect_from_filesystem(cluster_name, remote_fs_name, remote_cluster, remote_fs)
479
480 def init_pool_policy(self, filesystem):
481 metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map)
482 if not metadata_pool_id:
483 log.error(f'cannot find metadata pool-id for filesystem {filesystem}')
484 raise Exception(-errno.EINVAL)
485 try:
486 ioctx = self.rados.open_ioctx2(metadata_pool_id)
487 # TODO: make async if required
488 dir_mapping = load_dir_map(ioctx)
489 instances = load_instances(ioctx)
490 # init policy
491 fspolicy = FSPolicy(self.mgr, ioctx)
492 log.debug(f'init policy for filesystem {filesystem}: pool-id {metadata_pool_id}')
493 fspolicy.init(dir_mapping, instances)
494 self.pool_policy[filesystem] = fspolicy
495 except rados.Error as e:
496 log.error(f'failed to access pool-id {metadata_pool_id} for filesystem {filesystem}: {e}')
497 raise Exception(-e.errno)
498
499 def refresh_pool_policy_locked(self):
500 filesystems = self.get_mirrored_filesystems()
501 log.debug(f'refreshing policy for {filesystems}')
502 for filesystem in list(self.pool_policy):
503 if not filesystem in filesystems:
504 log.info(f'shutdown pool policy for {filesystem}')
505 fspolicy = self.pool_policy.pop(filesystem)
506 fspolicy.shutdown()
507 for filesystem in filesystems:
508 if not filesystem in self.pool_policy:
509 log.info(f'init pool policy for {filesystem}')
510 self.init_pool_policy(filesystem)
511
512 def refresh_pool_policy(self):
513 with self.lock:
514 self.refresh_pool_policy_locked()
515
516 def enable_mirror(self, filesystem):
517 log.info(f'enabling mirror for filesystem {filesystem}')
518 with self.lock:
519 try:
520 metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map)
521 if not metadata_pool_id:
522 log.error(f'cannot find metadata pool-id for filesystem {filesystem}')
523 raise Exception(-errno.EINVAL)
524 create_mirror_object(self.rados, metadata_pool_id)
525 cmd = {'prefix': 'fs mirror enable', 'fs_name': filesystem}
526 r, outs, err = self.mgr.mon_command(cmd)
527 if r < 0:
528 log.error(f'mon command to enable mirror failed: {err}')
529 raise Exception(-errno.EINVAL)
530 return 0, json.dumps({}), ''
531 except MirrorException as me:
532 return me.args[0], '', me.args[1]
533 except Exception as me:
534 return me.args[0], '', 'failed to enable mirroring'
535
536 def disable_mirror(self, filesystem):
537 log.info(f'disabling mirror for filesystem {filesystem}')
538 try:
539 with self.lock:
540 cmd = {'prefix': 'fs mirror disable', 'fs_name': filesystem}
541 r, outs, err = self.mgr.mon_command(cmd)
542 if r < 0:
543 log.error(f'mon command to disable mirror failed: {err}')
544 raise Exception(-errno.EINVAL)
545 return 0, json.dumps({}), ''
546 except MirrorException as me:
547 return me.args[0], '', me.args[1]
548 except Exception as e:
549 return e.args[0], '', 'failed to disable mirroring'
550
551 def peer_list(self, filesystem):
552 try:
553 with self.lock:
554 fspolicy = self.pool_policy.get(filesystem, None)
555 if not fspolicy:
556 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
557 peers = self.get_filesystem_peers(filesystem)
558 peer_res = {}
559 for peer_uuid, rem in peers.items():
560 conf = self.config_get(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid))
561 remote = rem['remote']
562 peer_res[peer_uuid] = {'client_name': remote['client_name'],
563 'site_name': remote['cluster_name'],
564 'fs_name': remote['fs_name']
565 }
566 if 'mon_host' in conf:
567 peer_res[peer_uuid]['mon_host'] = conf['mon_host']
568 return 0, json.dumps(peer_res), ''
569 except MirrorException as me:
570 return me.args[0], '', me.args[1]
571 except Exception as e:
572 return e.args[0], '', 'failed to list peers'
573
574 def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name, remote_conf):
575 try:
576 if remote_fs_name == None:
577 remote_fs_name = filesystem
578 with self.lock:
579 fspolicy = self.pool_policy.get(filesystem, None)
580 if not fspolicy:
581 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
582 ### peer updates for key, site-name are not yet supported
583 if self.peer_exists(filesystem, remote_cluster_spec, remote_fs_name):
584 return 0, json.dumps({}), ''
585 # _own_ the peer
586 self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name, remote_conf)
587 # unique peer uuid
588 peer_uuid = str(uuid.uuid4())
589 config_key = FSSnapshotMirror.peer_config_key(filesystem, peer_uuid)
590 if remote_conf.get('mon_host') and remote_conf.get('key'):
591 self.config_set(config_key, json.dumps(remote_conf))
592 cmd = {'prefix': 'fs mirror peer_add',
593 'fs_name': filesystem,
594 'uuid': peer_uuid,
595 'remote_cluster_spec': remote_cluster_spec,
596 'remote_fs_name': remote_fs_name}
597 r, outs, err = self.mgr.mon_command(cmd)
598 if r < 0:
599 log.error(f'mon command to add peer failed: {err}')
600 try:
601 log.debug(f'cleaning up config-key for {peer_uuid}')
602 self.config_set(config_key)
603 except:
604 pass
605 raise Exception(-errno.EINVAL)
606 return 0, json.dumps({}), ''
607 except MirrorException as me:
608 return me.args[0], '', me.args[1]
609 except Exception as e:
610 return e.args[0], '', 'failed to add peer'
611
612 def peer_remove(self, filesystem, peer_uuid):
613 try:
614 with self.lock:
615 fspolicy = self.pool_policy.get(filesystem, None)
616 if not fspolicy:
617 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
618 # ok, this is being a bit lazy. remove mirror info from peer followed
619 # by purging the peer from fsmap. if the mirror daemon fs map updates
620 # are laggy, they happily continue to synchronize. ideally, we should
621 # purge the peer from fsmap here and purge mirror info on fsmap update
622 # (in notify()). but thats not straightforward -- before purging mirror
623 # info, we would need to wait for all mirror daemons to catch up with
624 # fsmap updates. this involves mirror daemons sending the fsmap epoch
625 # they have seen in reply to a notify request. TODO: fix this.
626 self.purge_mirror_info(filesystem, peer_uuid)
627 cmd = {'prefix': 'fs mirror peer_remove',
628 'fs_name': filesystem,
629 'uuid': peer_uuid}
630 r, outs, err = self.mgr.mon_command(cmd)
631 if r < 0:
632 log.error(f'mon command to remove peer failed: {err}')
633 raise Exception(-errno.EINVAL)
634 self.config_set(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid))
635 return 0, json.dumps({}), ''
636 except MirrorException as me:
637 return me.args[0], '', me.args[1]
638 except Exception as e:
639 return e.args[0], '', 'failed to remove peer'
640
641 def peer_bootstrap_create(self, fs_name, client_name, site_name):
642 """create a bootstrap token for this peer filesystem"""
643 try:
644 with self.lock:
645 cmd = {'prefix': 'fs authorize',
646 'filesystem': fs_name,
647 'entity': client_name,
648 'caps': ['/', 'rwps']}
649 r, outs, err = self.mgr.mon_command(cmd)
650 if r < 0:
651 log.error(f'mon command to create peer user failed: {err}')
652 raise Exception(-errno.EINVAL)
653 cmd = {'prefix': 'auth get',
654 'entity': client_name,
655 'format': 'json'}
656 r, outs, err = self.mgr.mon_command(cmd)
657 if r < 0:
658 log.error(f'mon command to fetch keyring failed: {err}')
659 raise Exception(-errno.EINVAL)
660 outs = json.loads(outs)
661 outs0 = outs[0]
662 token_dct = {'fsid': self.mgr.rados.get_fsid(),
663 'filesystem': fs_name,
664 'user': outs0['entity'],
665 'site_name': site_name,
666 'key': outs0['key'],
667 'mon_host': self.mgr.rados.conf_get('mon_host')}
668 token_str = json.dumps(token_dct).encode('utf-8')
669 encoded_token = base64.b64encode(token_str)
670 return 0, json.dumps({'token': encoded_token.decode('utf-8')}), ''
671 except MirrorException as me:
672 return me.args[0], '', me.args[1]
673 except Exception as e:
674 return e.args[0], '', 'failed to bootstrap peer'
675
676 def peer_bootstrap_import(self, filesystem, token):
677 try:
678 token_str = base64.b64decode(token)
679 token_dct = json.loads(token_str.decode('utf-8'))
680 except:
681 return -errno.EINVAL, '', 'failed to parse token'
682 client_name = token_dct.pop('user')
683 cluster_name = token_dct.pop('site_name')
684 remote_fs_name = token_dct.pop('filesystem')
685 remote_cluster_spec = f'{client_name}@{cluster_name}'
686 return self.peer_add(filesystem, remote_cluster_spec, remote_fs_name, token_dct)
687
688 @staticmethod
689 def norm_path(dir_path):
690 if not os.path.isabs(dir_path):
691 raise MirrorException(-errno.EINVAL, f'{dir_path} should be an absolute path')
692 return os.path.normpath(dir_path)
693
694 def add_dir(self, filesystem, dir_path):
695 try:
696 with self.lock:
697 if not self.filesystem_exist(filesystem):
698 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
699 fspolicy = self.pool_policy.get(filesystem, None)
700 if not fspolicy:
701 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
702 dir_path = FSSnapshotMirror.norm_path(dir_path)
703 log.debug(f'path normalized to {dir_path}')
704 fspolicy.add_dir(dir_path)
705 return 0, json.dumps({}), ''
706 except MirrorException as me:
707 return me.args[0], '', me.args[1]
708 except Exception as e:
709 return e.args[0], '', 'failed to add directory'
710
711 def remove_dir(self, filesystem, dir_path):
712 try:
713 with self.lock:
714 if not self.filesystem_exist(filesystem):
715 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
716 fspolicy = self.pool_policy.get(filesystem, None)
717 if not fspolicy:
718 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
719 dir_path = FSSnapshotMirror.norm_path(dir_path)
720 fspolicy.remove_dir(dir_path)
721 return 0, json.dumps({}), ''
722 except MirrorException as me:
723 return me.args[0], '', me.args[1]
724 except Exception as e:
725 return e.args[0], '', 'failed to remove directory'
726
727 def status(self,filesystem, dir_path):
728 try:
729 with self.lock:
730 if not self.filesystem_exist(filesystem):
731 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
732 fspolicy = self.pool_policy.get(filesystem, None)
733 if not fspolicy:
734 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
735 dir_path = FSSnapshotMirror.norm_path(dir_path)
736 return fspolicy.status(dir_path)
737 except MirrorException as me:
738 return me.args[0], '', me.args[1]
739
740 def show_distribution(self, filesystem):
741 try:
742 with self.lock:
743 if not self.filesystem_exist(filesystem):
744 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
745 fspolicy = self.pool_policy.get(filesystem, None)
746 if not fspolicy:
747 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
748 return fspolicy.summary()
749 except MirrorException as me:
750 return me.args[0], '', me.args[1]
751
752 def daemon_status(self, filesystem):
753 try:
754 with self.lock:
755 if not self.filesystem_exist(filesystem):
756 raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
757 fspolicy = self.pool_policy.get(filesystem, None)
758 if not fspolicy:
759 raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
760 daemons = []
761 sm = self.mgr.get('service_map')
762 daemon_entry = sm['services'].get('cephfs-mirror', None)
763 log.debug(f'daemon_entry: {daemon_entry}')
764 if daemon_entry is not None:
765 for daemon_key in daemon_entry.get('daemons', []):
766 try:
767 daemon_id = int(daemon_key)
768 except ValueError:
769 continue
770 daemon = {
771 'daemon_id' : daemon_id,
772 'filesystems' : []
773 } # type: Dict[str, Any]
774 daemon_status = self.mgr.get_daemon_status('cephfs-mirror', daemon_key)
775 if not daemon_status:
776 log.debug(f'daemon status not yet availble for cephfs-mirror daemon: {daemon_key}')
777 continue
778 status = json.loads(daemon_status['status_json'])
779 for fs_id, fs_desc in status.items():
780 fs = {'filesystem_id' : int(fs_id),
781 'name' : fs_desc['name'],
782 'directory_count' : fs_desc['directory_count'],
783 'peers' : []
784 } # type: Dict[str, Any]
785 for peer_uuid, peer_desc in fs_desc['peers'].items():
786 peer = {
787 'uuid' : peer_uuid,
788 'remote' : peer_desc['remote'],
789 'stats' : peer_desc['stats']
790 }
791 fs['peers'].append(peer)
792 daemon['filesystems'].append(fs)
793 daemons.append(daemon)
794 return 0, json.dumps(daemons), ''
795 except MirrorException as me:
796 return me.args[0], '', me.args[1]