11 from typing
import Dict
, Any
16 from mgr_util
import RTimer
, CephfsClient
, open_filesystem
,\
17 CephfsConnectionException
18 from .blocklist
import blocklist
19 from .notify
import Notifier
, InstanceWatcher
20 from .utils
import INSTANCE_ID_PREFIX
, MIRROR_OBJECT_NAME
, Finisher
, \
21 AsyncOpTracker
, connect_to_filesystem
, disconnect_from_filesystem
22 from .exception
import MirrorException
23 from .dir_map
.create
import create_mirror_object
24 from .dir_map
.load
import load_dir_map
, load_instances
25 from .dir_map
.update
import UpdateDirMapRequest
, UpdateInstanceRequest
26 from .dir_map
.policy
import Policy
27 from .dir_map
.state_transition
import ActionType
29 log
= logging
.getLogger(__name__
)
31 CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL
= 1
34 class InstanceListener(InstanceWatcher
.Listener
):
35 def __init__(self
, fspolicy
):
36 self
.fspolicy
= fspolicy
38 def handle_instances(self
, added
, removed
):
39 self
.fspolicy
.update_instances(added
, removed
)
41 def __init__(self
, mgr
, ioctx
):
45 self
.policy
= Policy()
46 self
.lock
= threading
.Lock()
47 self
.cond
= threading
.Condition(self
.lock
)
49 self
.async_requests
= {}
50 self
.finisher
= Finisher()
51 self
.op_tracker
= AsyncOpTracker()
52 self
.notifier
= Notifier(ioctx
)
53 self
.instance_listener
= FSPolicy
.InstanceListener(self
)
54 self
.instance_watcher
= None
55 self
.stopping
= threading
.Event()
56 self
.timer_task
= RTimer(CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL
,
58 self
.timer_task
.start()
60 def schedule_action(self
, dir_paths
):
61 self
.dir_paths
.extend(dir_paths
)
63 def init(self
, dir_mapping
, instances
):
65 self
.policy
.init(dir_mapping
)
66 # we'll schedule action for all directories, so don't bother capturing
67 # directory names here.
68 self
.policy
.add_instances(list(instances
.keys()), initial_update
=True)
69 self
.instance_watcher
= InstanceWatcher(self
.ioctx
, instances
,
70 self
.instance_listener
)
71 self
.schedule_action(list(dir_mapping
.keys()))
75 log
.debug('FSPolicy.shutdown')
77 log
.debug('canceling update timer task')
78 self
.timer_task
.cancel()
79 log
.debug('update timer task canceled')
80 if self
.instance_watcher
:
81 log
.debug('stopping instance watcher')
82 self
.instance_watcher
.wait_and_stop()
83 log
.debug('stopping instance watcher')
84 self
.op_tracker
.wait_for_ops()
85 log
.debug('FSPolicy.shutdown done')
87 def handle_update_mapping(self
, updates
, removals
, request_id
, callback
, r
):
88 log
.info(f
'handle_update_mapping: {updates} {removals} {request_id} {callback} {r}')
91 self
.async_requests
.pop(request_id
)
93 callback(updates
, removals
, r
)
95 self
.op_tracker
.finish_async_op()
97 def handle_update_instances(self
, instances_added
, instances_removed
, request_id
, r
):
98 log
.info(f
'handle_update_instances: {instances_added} {instances_removed} {request_id} {r}')
101 self
.async_requests
.pop(request_id
)
102 if self
.stopping
.is_set():
103 log
.debug(f
'handle_update_instances: policy shutting down')
106 if instances_removed
:
107 schedules
.extend(self
.policy
.remove_instances(instances_removed
))
109 schedules
.extend(self
.policy
.add_instances(instances_added
))
110 self
.schedule_action(schedules
)
112 self
.op_tracker
.finish_async_op()
114 def update_mapping(self
, update_map
, removals
, callback
=None):
115 log
.info(f
'updating directory map: {len(update_map)}+{len(removals)} updates')
116 request_id
= str(uuid
.uuid4())
117 def async_callback(r
):
118 self
.finisher
.queue(self
.handle_update_mapping
,
119 [list(update_map
.keys()), removals
, request_id
, callback
, r
])
120 request
= UpdateDirMapRequest(self
.ioctx
, update_map
.copy(), removals
.copy(), async_callback
)
121 self
.async_requests
[request_id
] = request
122 self
.op_tracker
.start_async_op()
123 log
.debug(f
'async request_id: {request_id}')
126 def update_instances(self
, added
, removed
):
127 logging
.debug(f
'update_instances: added={added}, removed={removed}')
128 for instance_id
, addr
in removed
.items():
129 log
.info(f
'blocklisting instance_id: {instance_id} addr: {addr}')
130 blocklist(self
.mgr
, addr
)
133 instances_removed
= []
134 for instance_id
, addr
in added
.items():
135 instances_added
[instance_id
] = {'version': 1, 'addr': addr
}
136 instances_removed
= list(removed
.keys())
137 request_id
= str(uuid
.uuid4())
138 def async_callback(r
):
139 self
.finisher
.queue(self
.handle_update_instances
,
140 [list(instances_added
.keys()), instances_removed
, request_id
, r
])
141 # blacklisted instances can be removed at this point. remapping directories
142 # mapped to blacklisted instances on module startup is handled in policy
144 request
= UpdateInstanceRequest(self
.ioctx
, instances_added
.copy(),
145 instances_removed
.copy(), async_callback
)
146 self
.async_requests
[request_id
] = request
147 log
.debug(f
'async request_id: {request_id}')
148 self
.op_tracker
.start_async_op()
151 def continue_action(self
, updates
, removals
, r
):
152 log
.debug(f
'continuing action: {updates}+{removals} r={r}')
153 if self
.stopping
.is_set():
154 log
.debug('continue_action: policy shutting down')
157 for dir_path
in updates
:
158 schedule
= self
.policy
.finish_action(dir_path
, r
)
160 schedules
.append(dir_path
)
161 for dir_path
in removals
:
162 schedule
= self
.policy
.finish_action(dir_path
, r
)
164 schedules
.append(dir_path
)
165 self
.schedule_action(schedules
)
167 def handle_peer_ack(self
, dir_path
, r
):
168 log
.info(f
'handle_peer_ack: {dir_path} r={r}')
171 if self
.stopping
.is_set():
172 log
.debug(f
'handle_peer_ack: policy shutting down')
174 self
.continue_action([dir_path
], [], r
)
176 self
.op_tracker
.finish_async_op()
178 def process_updates(self
):
179 def acquire_message(dir_path
):
180 return json
.dumps({'dir_path': dir_path
,
183 def release_message(dir_path
):
184 return json
.dumps({'dir_path': dir_path
,
188 if not self
.dir_paths
or self
.stopping
.is_set():
194 for dir_path
in self
.dir_paths
:
195 action_type
= self
.policy
.start_action(dir_path
)
196 lookup_info
= self
.policy
.lookup(dir_path
)
197 log
.debug(f
'processing action: dir_path: {dir_path}, lookup_info: {lookup_info}, action_type: {action_type}')
198 if action_type
== ActionType
.NONE
:
200 elif action_type
== ActionType
.MAP_UPDATE
:
201 # take care to not overwrite purge status
202 update_map
[dir_path
] = {'version': 1,
203 'instance_id': lookup_info
['instance_id'],
204 'last_shuffled': lookup_info
['mapped_time']
206 if lookup_info
['purging']:
207 update_map
[dir_path
]['purging'] = 1
208 elif action_type
== ActionType
.MAP_REMOVE
:
209 removals
.append(dir_path
)
210 elif action_type
== ActionType
.ACQUIRE
:
211 notifies
[dir_path
] = (lookup_info
['instance_id'], acquire_message(dir_path
))
212 elif action_type
== ActionType
.RELEASE
:
213 notifies
[dir_path
] = (lookup_info
['instance_id'], release_message(dir_path
))
214 if update_map
or removals
:
215 self
.update_mapping(update_map
, removals
, callback
=self
.continue_action
)
216 for dir_path
, message
in notifies
.items():
217 self
.op_tracker
.start_async_op()
218 self
.notifier
.notify(dir_path
, message
, self
.handle_peer_ack
)
219 self
.dir_paths
.clear()
221 def add_dir(self
, dir_path
):
223 lookup_info
= self
.policy
.lookup(dir_path
)
225 if lookup_info
['purging']:
226 raise MirrorException(-errno
.EAGAIN
, f
'remove in-progress for {dir_path}')
228 raise MirrorException(-errno
.EEXIST
, f
'directory {dir_path} is already tracked')
229 schedule
= self
.policy
.add_dir(dir_path
)
232 update_map
= {dir_path
: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}}
234 def update_safe(updates
, removals
, r
):
237 self
.cond
.notifyAll()
238 self
.update_mapping(update_map
, [], callback
=update_safe
)
239 self
.cond
.wait_for(lambda: updated
)
240 self
.schedule_action([dir_path
])
242 def remove_dir(self
, dir_path
):
244 lookup_info
= self
.policy
.lookup(dir_path
)
246 raise MirrorException(-errno
.ENOENT
, f
'directory {dir_path} id not tracked')
247 if lookup_info
['purging']:
248 raise MirrorException(-errno
.EINVAL
, f
'directory {dir_path} is under removal')
249 update_map
= {dir_path
: {'version': 1,
250 'instance_id': lookup_info
['instance_id'],
251 'last_shuffled': lookup_info
['mapped_time'],
254 sync_lock
= threading
.Lock()
255 sync_cond
= threading
.Condition(sync_lock
)
260 sync_cond
.notifyAll()
261 request
= UpdateDirMapRequest(self
.ioctx
, update_map
.copy(), [], update_safe
)
264 sync_cond
.wait_for(lambda: updated
)
265 schedule
= self
.policy
.remove_dir(dir_path
)
267 self
.schedule_action([dir_path
])
269 def status(self
, dir_path
):
271 res
= self
.policy
.dir_status(dir_path
)
272 return 0, json
.dumps(res
, indent
=4, sort_keys
=True), ''
276 res
= self
.policy
.instance_summary()
277 return 0, json
.dumps(res
, indent
=4, sort_keys
=True), ''
279 class FSSnapshotMirror
:
280 PEER_CONFIG_KEY_PREFIX
= "cephfs/mirror/peer"
282 def __init__(self
, mgr
):
284 self
.rados
= mgr
.rados
285 self
.pool_policy
= {}
286 self
.fs_map
= self
.mgr
.get('fs_map')
287 self
.lock
= threading
.Lock()
288 self
.refresh_pool_policy()
289 self
.local_fs
= CephfsClient(mgr
)
291 def notify(self
, notify_type
):
292 log
.debug(f
'got notify type {notify_type}')
293 if notify_type
== 'fs_map':
295 self
.fs_map
= self
.mgr
.get('fs_map')
296 self
.refresh_pool_policy_locked()
299 def make_spec(client_name
, cluster_name
):
300 return f
'{client_name}@{cluster_name}'
303 def split_spec(spec
):
305 client_id
, cluster_name
= spec
.split('@')
306 _
, client_name
= client_id
.split('.')
307 return client_name
, cluster_name
309 raise MirrorException(-errno
.EINVAL
, f
'invalid cluster spec {spec}')
312 def get_metadata_pool(filesystem
, fs_map
):
313 for fs
in fs_map
['filesystems']:
314 if fs
['mdsmap']['fs_name'] == filesystem
:
315 return fs
['mdsmap']['metadata_pool']
319 def get_filesystem_id(filesystem
, fs_map
):
320 for fs
in fs_map
['filesystems']:
321 if fs
['mdsmap']['fs_name'] == filesystem
:
326 def peer_config_key(filesystem
, peer_uuid
):
327 return f
'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}'
329 def config_set(self
, key
, val
=None):
330 """set or remove a key from mon config store"""
332 cmd
= {'prefix': 'config-key set',
333 'key': key
, 'val': val
}
335 cmd
= {'prefix': 'config-key rm',
337 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
339 log
.error(f
'mon command to set/remove config-key {key} failed: {err}')
340 raise Exception(-errno
.EINVAL
)
342 def config_get(self
, key
):
343 """fetch a config key value from mon config store"""
344 cmd
= {'prefix': 'config-key get', 'key': key
}
345 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
346 if r
< 0 and not r
== -errno
.ENOENT
:
347 log
.error(f
'mon command to get config-key {key} failed: {err}')
348 raise Exception(-errno
.EINVAL
)
351 val
= json
.loads(outs
)
354 def filesystem_exist(self
, filesystem
):
355 for fs
in self
.fs_map
['filesystems']:
356 if fs
['mdsmap']['fs_name'] == filesystem
:
360 def get_mirrored_filesystems(self
):
361 return [fs
['mdsmap']['fs_name'] for fs
in self
.fs_map
['filesystems'] if fs
.get('mirror_info', None)]
363 def get_filesystem_peers(self
, filesystem
):
364 """To be used when mirroring in enabled for the filesystem"""
365 for fs
in self
.fs_map
['filesystems']:
366 if fs
['mdsmap']['fs_name'] == filesystem
:
367 return fs
['mirror_info']['peers']
370 def peer_exists(self
, filesystem
, remote_cluster_spec
, remote_fs_name
):
371 peers
= self
.get_filesystem_peers(filesystem
)
372 for _
, rem
in peers
.items():
373 remote
= rem
['remote']
374 spec
= FSSnapshotMirror
.make_spec(remote
['client_name'], remote
['cluster_name'])
375 if spec
== remote_cluster_spec
and remote
['fs_name'] == remote_fs_name
:
380 def get_mirror_info(fs
):
382 val
= fs
.getxattr('/', 'ceph.mirror.info')
383 match
= re
.search(r
'^cluster_id=([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}) fs_id=(\d+)$',
385 if match
and len(match
.groups()) == 2:
386 return {'cluster_id': match
.group(1),
387 'fs_id': int(match
.group(2))
389 raise MirrorException(-errno
.EINVAL
, 'invalid ceph.mirror.info value format')
390 except cephfs
.Error
as e
:
391 raise MirrorException(-e
.errno
, 'error fetching ceph.mirror.info xattr')
394 def set_mirror_info(local_cluster_id
, local_fsid
, remote_fs
):
395 log
.info(f
'setting {local_cluster_id}::{local_fsid} on remote')
397 remote_fs
.setxattr('/', 'ceph.mirror.info',
398 f
'cluster_id={local_cluster_id} fs_id={local_fsid}'.encode('utf-8'), os
.XATTR_CREATE
)
399 except cephfs
.Error
as e
:
400 if e
.errno
== errno
.EEXIST
:
402 mi
= FSSnapshotMirror
.get_mirror_info(remote_fs
)
403 cluster_id
= mi
['cluster_id']
405 if not (cluster_id
== local_cluster_id
and fs_id
== local_fsid
):
406 raise MirrorException(-errno
.EEXIST
, f
'peer mirrorred by: (cluster_id: {cluster_id}, fs_id: {fs_id})')
407 except MirrorException
:
408 # if mirror info cannot be fetched for some reason, let's just
410 raise MirrorException(-errno
.EEXIST
, f
'already an active peer')
412 log
.error(f
'error setting mirrored fsid: {e}')
413 raise Exception(-e
.errno
)
415 def resolve_peer(self
, fs_name
, peer_uuid
):
416 peers
= self
.get_filesystem_peers(fs_name
)
417 for peer
, rem
in peers
.items():
418 if peer
== peer_uuid
:
422 def purge_mirror_info(self
, local_fs_name
, peer_uuid
):
423 log
.debug(f
'local fs={local_fs_name} peer_uuid={peer_uuid}')
424 # resolve the peer to its spec
425 rem
= self
.resolve_peer(local_fs_name
, peer_uuid
)
428 log
.debug(f
'peer_uuid={peer_uuid} resolved to {rem}')
429 _
, client_name
= rem
['client_name'].split('.')
431 # fetch auth details from config store
432 remote_conf
= self
.config_get(FSSnapshotMirror
.peer_config_key(local_fs_name
, peer_uuid
))
433 remote_cluster
, remote_fs
= connect_to_filesystem(client_name
,
435 rem
['fs_name'], 'remote', conf_dct
=remote_conf
)
437 remote_fs
.removexattr('/', 'ceph.mirror.info')
438 except cephfs
.Error
as e
:
439 if not e
.errno
== errno
.ENOENT
:
440 log
.error('error removing mirror info')
441 raise Exception(-e
.errno
)
443 disconnect_from_filesystem(rem
['cluster_name'], rem
['fs_name'], remote_cluster
, remote_fs
)
445 def verify_and_set_mirror_info(self
, local_fs_name
, remote_cluster_spec
, remote_fs_name
, remote_conf
={}):
446 log
.debug(f
'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}')
448 client_name
, cluster_name
= FSSnapshotMirror
.split_spec(remote_cluster_spec
)
449 remote_cluster
, remote_fs
= connect_to_filesystem(client_name
, cluster_name
, remote_fs_name
,
450 'remote', conf_dct
=remote_conf
)
452 local_cluster_id
= self
.rados
.get_fsid()
453 remote_cluster_id
= remote_cluster
.get_fsid()
454 log
.debug(f
'local_cluster_id={local_cluster_id} remote_cluster_id={remote_cluster_id}')
455 if 'fsid' in remote_conf
:
456 if not remote_cluster_id
== remote_conf
['fsid']:
457 raise MirrorException(-errno
.EINVAL
, 'FSID mismatch between bootstrap token and remote cluster')
459 local_fscid
= remote_fscid
= None
460 with
open_filesystem(self
.local_fs
, local_fs_name
) as local_fsh
:
461 local_fscid
= local_fsh
.get_fscid()
462 remote_fscid
= remote_fs
.get_fscid()
463 log
.debug(f
'local_fscid={local_fscid} remote_fscid={remote_fscid}')
466 mi
= FSSnapshotMirror
.get_mirror_info(local_fsh
)
467 except MirrorException
as me
:
468 if me
.args
[0] != -errno
.ENODATA
:
469 raise Exception(-errno
.EINVAL
)
470 if mi
and mi
['cluster_id'] == remote_cluster_id
and mi
['fs_id'] == remote_fscid
:
471 raise MirrorException(-errno
.EINVAL
, f
'file system is an active peer for file system: {remote_fs_name}')
473 if local_cluster_id
== remote_cluster_id
and local_fscid
== remote_fscid
:
474 raise MirrorException(-errno
.EINVAL
, "'Source and destination cluster fsid and "\
475 "file-system name can't be the same")
476 FSSnapshotMirror
.set_mirror_info(local_cluster_id
, local_fscid
, remote_fs
)
478 disconnect_from_filesystem(cluster_name
, remote_fs_name
, remote_cluster
, remote_fs
)
480 def init_pool_policy(self
, filesystem
):
481 metadata_pool_id
= FSSnapshotMirror
.get_metadata_pool(filesystem
, self
.fs_map
)
482 if not metadata_pool_id
:
483 log
.error(f
'cannot find metadata pool-id for filesystem {filesystem}')
484 raise Exception(-errno
.EINVAL
)
486 ioctx
= self
.rados
.open_ioctx2(metadata_pool_id
)
487 # TODO: make async if required
488 dir_mapping
= load_dir_map(ioctx
)
489 instances
= load_instances(ioctx
)
491 fspolicy
= FSPolicy(self
.mgr
, ioctx
)
492 log
.debug(f
'init policy for filesystem {filesystem}: pool-id {metadata_pool_id}')
493 fspolicy
.init(dir_mapping
, instances
)
494 self
.pool_policy
[filesystem
] = fspolicy
495 except rados
.Error
as e
:
496 log
.error(f
'failed to access pool-id {metadata_pool_id} for filesystem {filesystem}: {e}')
497 raise Exception(-e
.errno
)
499 def refresh_pool_policy_locked(self
):
500 filesystems
= self
.get_mirrored_filesystems()
501 log
.debug(f
'refreshing policy for {filesystems}')
502 for filesystem
in list(self
.pool_policy
):
503 if not filesystem
in filesystems
:
504 log
.info(f
'shutdown pool policy for {filesystem}')
505 fspolicy
= self
.pool_policy
.pop(filesystem
)
507 for filesystem
in filesystems
:
508 if not filesystem
in self
.pool_policy
:
509 log
.info(f
'init pool policy for {filesystem}')
510 self
.init_pool_policy(filesystem
)
512 def refresh_pool_policy(self
):
514 self
.refresh_pool_policy_locked()
516 def enable_mirror(self
, filesystem
):
517 log
.info(f
'enabling mirror for filesystem {filesystem}')
520 metadata_pool_id
= FSSnapshotMirror
.get_metadata_pool(filesystem
, self
.fs_map
)
521 if not metadata_pool_id
:
522 log
.error(f
'cannot find metadata pool-id for filesystem {filesystem}')
523 raise Exception(-errno
.EINVAL
)
524 create_mirror_object(self
.rados
, metadata_pool_id
)
525 cmd
= {'prefix': 'fs mirror enable', 'fs_name': filesystem
}
526 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
528 log
.error(f
'mon command to enable mirror failed: {err}')
529 raise Exception(-errno
.EINVAL
)
530 return 0, json
.dumps({}), ''
531 except MirrorException
as me
:
532 return me
.args
[0], '', me
.args
[1]
533 except Exception as me
:
534 return me
.args
[0], '', 'failed to enable mirroring'
536 def disable_mirror(self
, filesystem
):
537 log
.info(f
'disabling mirror for filesystem {filesystem}')
540 cmd
= {'prefix': 'fs mirror disable', 'fs_name': filesystem
}
541 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
543 log
.error(f
'mon command to disable mirror failed: {err}')
544 raise Exception(-errno
.EINVAL
)
545 return 0, json
.dumps({}), ''
546 except MirrorException
as me
:
547 return me
.args
[0], '', me
.args
[1]
548 except Exception as e
:
549 return e
.args
[0], '', 'failed to disable mirroring'
551 def peer_list(self
, filesystem
):
554 fspolicy
= self
.pool_policy
.get(filesystem
, None)
556 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
557 peers
= self
.get_filesystem_peers(filesystem
)
559 for peer_uuid
, rem
in peers
.items():
560 conf
= self
.config_get(FSSnapshotMirror
.peer_config_key(filesystem
, peer_uuid
))
561 remote
= rem
['remote']
562 peer_res
[peer_uuid
] = {'client_name': remote
['client_name'],
563 'site_name': remote
['cluster_name'],
564 'fs_name': remote
['fs_name']
566 if 'mon_host' in conf
:
567 peer_res
[peer_uuid
]['mon_host'] = conf
['mon_host']
568 return 0, json
.dumps(peer_res
), ''
569 except MirrorException
as me
:
570 return me
.args
[0], '', me
.args
[1]
571 except Exception as e
:
572 return e
.args
[0], '', 'failed to list peers'
574 def peer_add(self
, filesystem
, remote_cluster_spec
, remote_fs_name
, remote_conf
):
576 if remote_fs_name
== None:
577 remote_fs_name
= filesystem
579 fspolicy
= self
.pool_policy
.get(filesystem
, None)
581 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
582 ### peer updates for key, site-name are not yet supported
583 if self
.peer_exists(filesystem
, remote_cluster_spec
, remote_fs_name
):
584 return 0, json
.dumps({}), ''
586 self
.verify_and_set_mirror_info(filesystem
, remote_cluster_spec
, remote_fs_name
, remote_conf
)
588 peer_uuid
= str(uuid
.uuid4())
589 config_key
= FSSnapshotMirror
.peer_config_key(filesystem
, peer_uuid
)
590 if remote_conf
.get('mon_host') and remote_conf
.get('key'):
591 self
.config_set(config_key
, json
.dumps(remote_conf
))
592 cmd
= {'prefix': 'fs mirror peer_add',
593 'fs_name': filesystem
,
595 'remote_cluster_spec': remote_cluster_spec
,
596 'remote_fs_name': remote_fs_name
}
597 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
599 log
.error(f
'mon command to add peer failed: {err}')
601 log
.debug(f
'cleaning up config-key for {peer_uuid}')
602 self
.config_set(config_key
)
605 raise Exception(-errno
.EINVAL
)
606 return 0, json
.dumps({}), ''
607 except MirrorException
as me
:
608 return me
.args
[0], '', me
.args
[1]
609 except Exception as e
:
610 return e
.args
[0], '', 'failed to add peer'
612 def peer_remove(self
, filesystem
, peer_uuid
):
615 fspolicy
= self
.pool_policy
.get(filesystem
, None)
617 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
618 # ok, this is being a bit lazy. remove mirror info from peer followed
619 # by purging the peer from fsmap. if the mirror daemon fs map updates
620 # are laggy, they happily continue to synchronize. ideally, we should
621 # purge the peer from fsmap here and purge mirror info on fsmap update
622 # (in notify()). but thats not straightforward -- before purging mirror
623 # info, we would need to wait for all mirror daemons to catch up with
624 # fsmap updates. this involves mirror daemons sending the fsmap epoch
625 # they have seen in reply to a notify request. TODO: fix this.
626 self
.purge_mirror_info(filesystem
, peer_uuid
)
627 cmd
= {'prefix': 'fs mirror peer_remove',
628 'fs_name': filesystem
,
630 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
632 log
.error(f
'mon command to remove peer failed: {err}')
633 raise Exception(-errno
.EINVAL
)
634 self
.config_set(FSSnapshotMirror
.peer_config_key(filesystem
, peer_uuid
))
635 return 0, json
.dumps({}), ''
636 except MirrorException
as me
:
637 return me
.args
[0], '', me
.args
[1]
638 except Exception as e
:
639 return e
.args
[0], '', 'failed to remove peer'
641 def peer_bootstrap_create(self
, fs_name
, client_name
, site_name
):
642 """create a bootstrap token for this peer filesystem"""
645 cmd
= {'prefix': 'fs authorize',
646 'filesystem': fs_name
,
647 'entity': client_name
,
648 'caps': ['/', 'rwps']}
649 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
651 log
.error(f
'mon command to create peer user failed: {err}')
652 raise Exception(-errno
.EINVAL
)
653 cmd
= {'prefix': 'auth get',
654 'entity': client_name
,
656 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
658 log
.error(f
'mon command to fetch keyring failed: {err}')
659 raise Exception(-errno
.EINVAL
)
660 outs
= json
.loads(outs
)
662 token_dct
= {'fsid': self
.mgr
.rados
.get_fsid(),
663 'filesystem': fs_name
,
664 'user': outs0
['entity'],
665 'site_name': site_name
,
667 'mon_host': self
.mgr
.rados
.conf_get('mon_host')}
668 token_str
= json
.dumps(token_dct
).encode('utf-8')
669 encoded_token
= base64
.b64encode(token_str
)
670 return 0, json
.dumps({'token': encoded_token
.decode('utf-8')}), ''
671 except MirrorException
as me
:
672 return me
.args
[0], '', me
.args
[1]
673 except Exception as e
:
674 return e
.args
[0], '', 'failed to bootstrap peer'
676 def peer_bootstrap_import(self
, filesystem
, token
):
678 token_str
= base64
.b64decode(token
)
679 token_dct
= json
.loads(token_str
.decode('utf-8'))
681 return -errno
.EINVAL
, '', 'failed to parse token'
682 client_name
= token_dct
.pop('user')
683 cluster_name
= token_dct
.pop('site_name')
684 remote_fs_name
= token_dct
.pop('filesystem')
685 remote_cluster_spec
= f
'{client_name}@{cluster_name}'
686 return self
.peer_add(filesystem
, remote_cluster_spec
, remote_fs_name
, token_dct
)
689 def norm_path(dir_path
):
690 if not os
.path
.isabs(dir_path
):
691 raise MirrorException(-errno
.EINVAL
, f
'{dir_path} should be an absolute path')
692 return os
.path
.normpath(dir_path
)
694 def add_dir(self
, filesystem
, dir_path
):
697 if not self
.filesystem_exist(filesystem
):
698 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
699 fspolicy
= self
.pool_policy
.get(filesystem
, None)
701 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
702 dir_path
= FSSnapshotMirror
.norm_path(dir_path
)
703 log
.debug(f
'path normalized to {dir_path}')
704 fspolicy
.add_dir(dir_path
)
705 return 0, json
.dumps({}), ''
706 except MirrorException
as me
:
707 return me
.args
[0], '', me
.args
[1]
708 except Exception as e
:
709 return e
.args
[0], '', 'failed to add directory'
711 def remove_dir(self
, filesystem
, dir_path
):
714 if not self
.filesystem_exist(filesystem
):
715 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
716 fspolicy
= self
.pool_policy
.get(filesystem
, None)
718 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
719 dir_path
= FSSnapshotMirror
.norm_path(dir_path
)
720 fspolicy
.remove_dir(dir_path
)
721 return 0, json
.dumps({}), ''
722 except MirrorException
as me
:
723 return me
.args
[0], '', me
.args
[1]
724 except Exception as e
:
725 return e
.args
[0], '', 'failed to remove directory'
727 def status(self
,filesystem
, dir_path
):
730 if not self
.filesystem_exist(filesystem
):
731 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
732 fspolicy
= self
.pool_policy
.get(filesystem
, None)
734 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
735 dir_path
= FSSnapshotMirror
.norm_path(dir_path
)
736 return fspolicy
.status(dir_path
)
737 except MirrorException
as me
:
738 return me
.args
[0], '', me
.args
[1]
740 def show_distribution(self
, filesystem
):
743 if not self
.filesystem_exist(filesystem
):
744 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
745 fspolicy
= self
.pool_policy
.get(filesystem
, None)
747 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
748 return fspolicy
.summary()
749 except MirrorException
as me
:
750 return me
.args
[0], '', me
.args
[1]
752 def daemon_status(self
, filesystem
):
755 if not self
.filesystem_exist(filesystem
):
756 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
757 fspolicy
= self
.pool_policy
.get(filesystem
, None)
759 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
761 sm
= self
.mgr
.get('service_map')
762 daemon_entry
= sm
['services'].get('cephfs-mirror', None)
763 log
.debug(f
'daemon_entry: {daemon_entry}')
764 for daemon_key
in daemon_entry
.get('daemons', []):
766 daemon_id
= int(daemon_key
)
770 'daemon_id' : daemon_id
,
772 } # type: Dict[str, Any]
773 daemon_status
= self
.mgr
.get_daemon_status('cephfs-mirror', daemon_key
)
774 if not daemon_status
:
775 log
.debug(f
'daemon status not yet availble for cephfs-mirror daemon: {daemon_key}')
777 status
= json
.loads(daemon_status
['status_json'])
778 for fs_id
, fs_desc
in status
.items():
779 fs
= {'filesystem_id' : int(fs_id
),
780 'name' : fs_desc
['name'],
781 'directory_count' : fs_desc
['directory_count'],
783 } # type: Dict[str, Any]
784 for peer_uuid
, peer_desc
in fs_desc
['peers'].items():
787 'remote' : peer_desc
['remote'],
788 'stats' : peer_desc
['stats']
790 fs
['peers'].append(peer
)
791 daemon
['filesystems'].append(fs
)
792 daemons
.append(daemon
)
793 return 0, json
.dumps(daemons
), ''
794 except MirrorException
as me
:
795 return me
.args
[0], '', me
.args
[1]