15 from mgr_util
import RTimer
, CephfsClient
, open_filesystem
,\
16 CephfsConnectionException
17 from .blocklist
import blocklist
18 from .notify
import Notifier
, InstanceWatcher
19 from .utils
import INSTANCE_ID_PREFIX
, MIRROR_OBJECT_NAME
, Finisher
, \
20 AsyncOpTracker
, connect_to_filesystem
, disconnect_from_filesystem
21 from .exception
import MirrorException
22 from .dir_map
.create
import create_mirror_object
23 from .dir_map
.load
import load_dir_map
, load_instances
24 from .dir_map
.update
import UpdateDirMapRequest
, UpdateInstanceRequest
25 from .dir_map
.policy
import Policy
26 from .dir_map
.state_transition
import ActionType
28 log
= logging
.getLogger(__name__
)
30 CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL
= 1
33 class InstanceListener(InstanceWatcher
.Listener
):
34 def __init__(self
, fspolicy
):
35 self
.fspolicy
= fspolicy
37 def handle_instances(self
, added
, removed
):
38 self
.fspolicy
.update_instances(added
, removed
)
40 def __init__(self
, mgr
, ioctx
):
44 self
.policy
= Policy()
45 self
.lock
= threading
.Lock()
46 self
.cond
= threading
.Condition(self
.lock
)
48 self
.async_requests
= {}
49 self
.finisher
= Finisher()
50 self
.op_tracker
= AsyncOpTracker()
51 self
.notifier
= Notifier(ioctx
)
52 self
.instance_listener
= FSPolicy
.InstanceListener(self
)
53 self
.instance_watcher
= None
54 self
.stopping
= threading
.Event()
55 self
.timer_task
= RTimer(CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL
,
57 self
.timer_task
.start()
59 def schedule_action(self
, dir_paths
):
60 self
.dir_paths
.extend(dir_paths
)
62 def init(self
, dir_mapping
, instances
):
64 self
.policy
.init(dir_mapping
)
65 # we'll schedule action for all directories, so don't bother capturing
66 # directory names here.
67 self
.policy
.add_instances(list(instances
.keys()), initial_update
=True)
68 self
.instance_watcher
= InstanceWatcher(self
.ioctx
, instances
,
69 self
.instance_listener
)
70 self
.schedule_action(list(dir_mapping
.keys()))
74 log
.debug('FSPolicy.shutdown')
76 log
.debug('canceling update timer task')
77 self
.timer_task
.cancel()
78 log
.debug('update timer task canceled')
79 if self
.instance_watcher
:
80 log
.debug('stopping instance watcher')
81 self
.instance_watcher
.wait_and_stop()
82 log
.debug('stopping instance watcher')
83 self
.op_tracker
.wait_for_ops()
84 log
.debug('FSPolicy.shutdown done')
86 def handle_update_mapping(self
, updates
, removals
, request_id
, callback
, r
):
87 log
.info(f
'handle_update_mapping: {updates} {removals} {request_id} {callback} {r}')
90 self
.async_requests
.pop(request_id
)
92 callback(updates
, removals
, r
)
94 self
.op_tracker
.finish_async_op()
96 def handle_update_instances(self
, instances_added
, instances_removed
, request_id
, r
):
97 log
.info(f
'handle_update_instances: {instances_added} {instances_removed} {request_id} {r}')
100 self
.async_requests
.pop(request_id
)
101 if self
.stopping
.is_set():
102 log
.debug(f
'handle_update_instances: policy shutting down')
105 if instances_removed
:
106 schedules
.extend(self
.policy
.remove_instances(instances_removed
))
108 schedules
.extend(self
.policy
.add_instances(instances_added
))
109 self
.schedule_action(schedules
)
111 self
.op_tracker
.finish_async_op()
113 def update_mapping(self
, update_map
, removals
, callback
=None):
114 log
.info(f
'updating directory map: {len(update_map)}+{len(removals)} updates')
115 request_id
= str(uuid
.uuid4())
116 def async_callback(r
):
117 self
.finisher
.queue(self
.handle_update_mapping
,
118 [list(update_map
.keys()), removals
, request_id
, callback
, r
])
119 request
= UpdateDirMapRequest(self
.ioctx
, update_map
.copy(), removals
.copy(), async_callback
)
120 self
.async_requests
[request_id
] = request
121 self
.op_tracker
.start_async_op()
122 log
.debug(f
'async request_id: {request_id}')
125 def update_instances(self
, added
, removed
):
126 logging
.debug(f
'update_instances: added={added}, removed={removed}')
127 for instance_id
, addr
in removed
.items():
128 log
.info(f
'blocklisting instance_id: {instance_id} addr: {addr}')
129 blocklist(self
.mgr
, addr
)
132 instances_removed
= []
133 for instance_id
, addr
in added
.items():
134 instances_added
[instance_id
] = {'version': 1, 'addr': addr
}
135 instances_removed
= list(removed
.keys())
136 request_id
= str(uuid
.uuid4())
137 def async_callback(r
):
138 self
.finisher
.queue(self
.handle_update_instances
,
139 [list(instances_added
.keys()), instances_removed
, request_id
, r
])
140 # blacklisted instances can be removed at this point. remapping directories
141 # mapped to blacklisted instances on module startup is handled in policy
143 request
= UpdateInstanceRequest(self
.ioctx
, instances_added
.copy(),
144 instances_removed
.copy(), async_callback
)
145 self
.async_requests
[request_id
] = request
146 log
.debug(f
'async request_id: {request_id}')
147 self
.op_tracker
.start_async_op()
150 def continue_action(self
, updates
, removals
, r
):
151 log
.debug(f
'continuing action: {updates}+{removals} r={r}')
152 if self
.stopping
.is_set():
153 log
.debug('continue_action: policy shutting down')
156 for dir_path
in updates
:
157 schedule
= self
.policy
.finish_action(dir_path
, r
)
159 schedules
.append(dir_path
)
160 for dir_path
in removals
:
161 schedule
= self
.policy
.finish_action(dir_path
, r
)
163 schedules
.append(dir_path
)
164 self
.schedule_action(schedules
)
166 def handle_peer_ack(self
, dir_path
, r
):
167 log
.info(f
'handle_peer_ack: {dir_path} r={r}')
170 if self
.stopping
.is_set():
171 log
.debug(f
'handle_peer_ack: policy shutting down')
173 self
.continue_action([dir_path
], [], r
)
175 self
.op_tracker
.finish_async_op()
177 def process_updates(self
):
178 def acquire_message(dir_path
):
179 return json
.dumps({'dir_path': dir_path
,
182 def release_message(dir_path
):
183 return json
.dumps({'dir_path': dir_path
,
187 if not self
.dir_paths
or self
.stopping
.is_set():
193 for dir_path
in self
.dir_paths
:
194 action_type
= self
.policy
.start_action(dir_path
)
195 lookup_info
= self
.policy
.lookup(dir_path
)
196 log
.debug(f
'processing action: dir_path: {dir_path}, lookup_info: {lookup_info}, action_type: {action_type}')
197 if action_type
== ActionType
.NONE
:
199 elif action_type
== ActionType
.MAP_UPDATE
:
200 # take care to not overwrite purge status
201 update_map
[dir_path
] = {'version': 1,
202 'instance_id': lookup_info
['instance_id'],
203 'last_shuffled': lookup_info
['mapped_time']
205 if lookup_info
['purging']:
206 update_map
[dir_path
]['purging'] = 1
207 elif action_type
== ActionType
.MAP_REMOVE
:
208 removals
.append(dir_path
)
209 elif action_type
== ActionType
.ACQUIRE
:
210 notifies
[dir_path
] = (lookup_info
['instance_id'], acquire_message(dir_path
))
211 elif action_type
== ActionType
.RELEASE
:
212 notifies
[dir_path
] = (lookup_info
['instance_id'], release_message(dir_path
))
213 if update_map
or removals
:
214 self
.update_mapping(update_map
, removals
, callback
=self
.continue_action
)
215 for dir_path
, message
in notifies
.items():
216 self
.op_tracker
.start_async_op()
217 self
.notifier
.notify(dir_path
, message
, self
.handle_peer_ack
)
218 self
.dir_paths
.clear()
220 def add_dir(self
, dir_path
):
222 lookup_info
= self
.policy
.lookup(dir_path
)
224 if lookup_info
['purging']:
225 raise MirrorException(-errno
.EAGAIN
, f
'remove in-progress for {dir_path}')
227 raise MirrorException(-errno
.EEXIST
, f
'directory {dir_path} is already tracked')
228 schedule
= self
.policy
.add_dir(dir_path
)
231 update_map
= {dir_path
: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}}
233 def update_safe(updates
, removals
, r
):
236 self
.cond
.notifyAll()
237 self
.update_mapping(update_map
, [], callback
=update_safe
)
238 self
.cond
.wait_for(lambda: updated
)
239 self
.schedule_action([dir_path
])
241 def remove_dir(self
, dir_path
):
243 lookup_info
= self
.policy
.lookup(dir_path
)
245 raise MirrorException(-errno
.ENOENT
, f
'directory {dir_path} id not tracked')
246 if lookup_info
['purging']:
247 raise MirrorException(-errno
.EINVAL
, f
'directory {dir_path} is under removal')
248 update_map
= {dir_path
: {'version': 1,
249 'instance_id': lookup_info
['instance_id'],
250 'last_shuffled': lookup_info
['mapped_time'],
253 sync_lock
= threading
.Lock()
254 sync_cond
= threading
.Condition(sync_lock
)
259 sync_cond
.notifyAll()
260 request
= UpdateDirMapRequest(self
.ioctx
, update_map
.copy(), [], update_safe
)
263 sync_cond
.wait_for(lambda: updated
)
264 schedule
= self
.policy
.remove_dir(dir_path
)
266 self
.schedule_action([dir_path
])
268 def status(self
, dir_path
):
270 res
= self
.policy
.dir_status(dir_path
)
271 return 0, json
.dumps(res
, indent
=4, sort_keys
=True), ''
275 res
= self
.policy
.instance_summary()
276 return 0, json
.dumps(res
, indent
=4, sort_keys
=True), ''
278 class FSSnapshotMirror
:
279 PEER_CONFIG_KEY_PREFIX
= "cephfs/mirror/peer"
281 def __init__(self
, mgr
):
283 self
.rados
= mgr
.rados
284 self
.pool_policy
= {}
285 self
.fs_map
= self
.mgr
.get('fs_map')
286 self
.lock
= threading
.Lock()
287 self
.refresh_pool_policy()
288 self
.local_fs
= CephfsClient(mgr
)
290 def notify(self
, notify_type
):
291 log
.debug(f
'got notify type {notify_type}')
292 if notify_type
== 'fs_map':
294 self
.fs_map
= self
.mgr
.get('fs_map')
295 self
.refresh_pool_policy_locked()
298 def make_spec(client_name
, cluster_name
):
299 return f
'{client_name}@{cluster_name}'
302 def split_spec(spec
):
304 client_id
, cluster_name
= spec
.split('@')
305 _
, client_name
= client_id
.split('.')
306 return client_name
, cluster_name
308 raise MirrorException(-errno
.EINVAL
, f
'invalid cluster spec {spec}')
311 def get_metadata_pool(filesystem
, fs_map
):
312 for fs
in fs_map
['filesystems']:
313 if fs
['mdsmap']['fs_name'] == filesystem
:
314 return fs
['mdsmap']['metadata_pool']
318 def get_filesystem_id(filesystem
, fs_map
):
319 for fs
in fs_map
['filesystems']:
320 if fs
['mdsmap']['fs_name'] == filesystem
:
325 def peer_config_key(filesystem
, peer_uuid
):
326 return f
'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}'
328 def config_set(self
, key
, val
=None):
329 """set or remove a key from mon config store"""
331 cmd
= {'prefix': 'config-key set',
332 'key': key
, 'val': val
}
334 cmd
= {'prefix': 'config-key rm',
336 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
338 log
.error(f
'mon command to set/remove config-key {key} failed: {err}')
339 raise Exception(-errno
.EINVAL
)
341 def config_get(self
, key
):
342 """fetch a config key value from mon config store"""
343 cmd
= {'prefix': 'config-key get', 'key': key
}
344 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
345 if r
< 0 and not r
== -errno
.ENOENT
:
346 log
.error(f
'mon command to get config-key {key} failed: {err}')
347 raise Exception(-errno
.EINVAL
)
350 val
= json
.loads(outs
)
353 def filesystem_exist(self
, filesystem
):
354 for fs
in self
.fs_map
['filesystems']:
355 if fs
['mdsmap']['fs_name'] == filesystem
:
359 def get_mirrored_filesystems(self
):
360 return [fs
['mdsmap']['fs_name'] for fs
in self
.fs_map
['filesystems'] if fs
.get('mirror_info', None)]
362 def get_filesystem_peers(self
, filesystem
):
363 """To be used when mirroring in enabled for the filesystem"""
364 for fs
in self
.fs_map
['filesystems']:
365 if fs
['mdsmap']['fs_name'] == filesystem
:
366 return fs
['mirror_info']['peers']
369 def peer_exists(self
, filesystem
, remote_cluster_spec
, remote_fs_name
):
370 peers
= self
.get_filesystem_peers(filesystem
)
371 for _
, rem
in peers
.items():
372 remote
= rem
['remote']
373 spec
= FSSnapshotMirror
.make_spec(remote
['client_name'], remote
['cluster_name'])
374 if spec
== remote_cluster_spec
and remote
['fs_name'] == remote_fs_name
:
378 def get_mirror_info(self
, remote_fs
):
380 val
= remote_fs
.getxattr('/', 'ceph.mirror.info')
381 match
= re
.search(r
'^cluster_id=([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}) fs_id=(\d+)$',
383 if match
and len(match
.groups()) == 2:
384 return {'cluster_id': match
.group(1),
385 'fs_id': int(match
.group(2))
388 except cephfs
.Error
as e
:
391 def set_mirror_info(self
, local_cluster_id
, local_fsid
, remote_fs
):
392 log
.info(f
'setting {local_cluster_id}::{local_fsid} on remote')
394 remote_fs
.setxattr('/', 'ceph.mirror.info',
395 f
'cluster_id={local_cluster_id} fs_id={local_fsid}'.encode('utf-8'), os
.XATTR_CREATE
)
396 except cephfs
.Error
as e
:
397 if e
.errno
== errno
.EEXIST
:
398 mi
= self
.get_mirror_info(remote_fs
)
400 log
.error(f
'error fetching mirror info when setting mirror info')
401 raise Exception(-errno
.EINVAL
)
402 cluster_id
= mi
['cluster_id']
404 if not (cluster_id
== local_cluster_id
and fs_id
== local_fsid
):
405 raise MirrorException(-errno
.EEXIST
, f
'peer mirrorred by: (cluster_id: {cluster_id}, fs_id: {fs_id})')
407 log
.error(f
'error setting mirrored fsid: {e}')
408 raise Exception(-e
.errno
)
410 def resolve_peer(self
, fs_name
, peer_uuid
):
411 peers
= self
.get_filesystem_peers(fs_name
)
412 for peer
, rem
in peers
.items():
413 if peer
== peer_uuid
:
417 def purge_mirror_info(self
, local_fs_name
, peer_uuid
):
418 log
.debug(f
'local fs={local_fs_name} peer_uuid={peer_uuid}')
419 # resolve the peer to its spec
420 rem
= self
.resolve_peer(local_fs_name
, peer_uuid
)
423 log
.debug(f
'peer_uuid={peer_uuid} resolved to {rem}')
424 _
, client_name
= rem
['client_name'].split('.')
426 # fetch auth details from config store
427 remote_conf
= self
.config_get(FSSnapshotMirror
.peer_config_key(local_fs_name
, peer_uuid
))
428 remote_cluster
, remote_fs
= connect_to_filesystem(client_name
,
430 rem
['fs_name'], 'remote', conf_dct
=remote_conf
)
432 remote_fs
.removexattr('/', 'ceph.mirror.info')
433 except cephfs
.Error
as e
:
434 if not e
.errno
== errno
.ENOENT
:
435 log
.error('error removing mirror info')
436 raise Exception(-e
.errno
)
438 disconnect_from_filesystem(rem
['cluster_name'], rem
['fs_name'], remote_cluster
, remote_fs
)
440 def verify_and_set_mirror_info(self
, local_fs_name
, remote_cluster_spec
, remote_fs_name
, remote_conf
={}):
441 log
.debug(f
'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}')
443 client_name
, cluster_name
= FSSnapshotMirror
.split_spec(remote_cluster_spec
)
444 remote_cluster
, remote_fs
= connect_to_filesystem(client_name
, cluster_name
, remote_fs_name
,
445 'remote', conf_dct
=remote_conf
)
446 if 'fsid' in remote_conf
:
447 if not remote_cluster
.get_fsid() == remote_conf
['fsid']:
448 raise MirrorException(-errno
.EINVAL
, 'FSID mismatch between bootstrap token and remote cluster')
450 local_fsid
= FSSnapshotMirror
.get_filesystem_id(local_fs_name
, self
.fs_map
)
451 if local_fsid
is None:
452 log
.error(f
'error looking up filesystem id for {local_fs_name}')
453 raise Exception(-errno
.EINVAL
)
455 # post cluster id comparison, filesystem name comparison would suffice
456 local_cluster_id
= self
.rados
.get_fsid()
457 remote_cluster_id
= remote_cluster
.get_fsid()
458 log
.debug(f
'local_cluster_id={local_cluster_id} remote_cluster_id={remote_cluster_id}')
459 if local_cluster_id
== remote_cluster_id
and local_fs_name
== remote_fs_name
:
460 raise MirrorException(-errno
.EINVAL
, "'Source and destination cluster fsid and "\
461 "file-system name can't be the same")
464 self
.set_mirror_info(local_cluster_id
, local_fsid
, remote_fs
)
466 disconnect_from_filesystem(cluster_name
, remote_fs_name
, remote_cluster
, remote_fs
)
468 def init_pool_policy(self
, filesystem
):
469 metadata_pool_id
= FSSnapshotMirror
.get_metadata_pool(filesystem
, self
.fs_map
)
470 if not metadata_pool_id
:
471 log
.error(f
'cannot find metadata pool-id for filesystem {filesystem}')
472 raise Exception(-errno
.EINVAL
)
474 ioctx
= self
.rados
.open_ioctx2(metadata_pool_id
)
475 # TODO: make async if required
476 dir_mapping
= load_dir_map(ioctx
)
477 instances
= load_instances(ioctx
)
479 fspolicy
= FSPolicy(self
.mgr
, ioctx
)
480 log
.debug(f
'init policy for filesystem {filesystem}: pool-id {metadata_pool_id}')
481 fspolicy
.init(dir_mapping
, instances
)
482 self
.pool_policy
[filesystem
] = fspolicy
483 except rados
.Error
as e
:
484 log
.error(f
'failed to access pool-id {metadata_pool_id} for filesystem {filesystem}: {e}')
485 raise Exception(-e
.errno
)
487 def refresh_pool_policy_locked(self
):
488 filesystems
= self
.get_mirrored_filesystems()
489 log
.debug(f
'refreshing policy for {filesystems}')
490 for filesystem
in list(self
.pool_policy
):
491 if not filesystem
in filesystems
:
492 log
.info(f
'shutdown pool policy for {filesystem}')
493 fspolicy
= self
.pool_policy
.pop(filesystem
)
495 for filesystem
in filesystems
:
496 if not filesystem
in self
.pool_policy
:
497 log
.info(f
'init pool policy for {filesystem}')
498 self
.init_pool_policy(filesystem
)
500 def refresh_pool_policy(self
):
502 self
.refresh_pool_policy_locked()
504 def enable_mirror(self
, filesystem
):
505 log
.info(f
'enabling mirror for filesystem {filesystem}')
508 metadata_pool_id
= FSSnapshotMirror
.get_metadata_pool(filesystem
, self
.fs_map
)
509 if not metadata_pool_id
:
510 log
.error(f
'cannot find metadata pool-id for filesystem {filesystem}')
511 raise Exception(-errno
.EINVAL
)
512 create_mirror_object(self
.rados
, metadata_pool_id
)
513 cmd
= {'prefix': 'fs mirror enable', 'fs_name': filesystem
}
514 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
516 log
.error(f
'mon command to enable mirror failed: {err}')
517 raise Exception(-errno
.EINVAL
)
518 return 0, json
.dumps({}), ''
519 except MirrorException
as me
:
520 return me
.args
[0], '', me
.args
[1]
521 except Exception as me
:
522 return me
.args
[0], '', 'failed to enable mirroring'
524 def disable_mirror(self
, filesystem
):
525 log
.info(f
'disabling mirror for filesystem {filesystem}')
528 cmd
= {'prefix': 'fs mirror disable', 'fs_name': filesystem
}
529 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
531 log
.error(f
'mon command to disable mirror failed: {err}')
532 raise Exception(-errno
.EINVAL
)
533 return 0, json
.dumps({}), ''
534 except MirrorException
as me
:
535 return me
.args
[0], '', me
.args
[1]
536 except Exception as e
:
537 return e
.args
[0], '', 'failed to disable mirroring'
539 def peer_list(self
, filesystem
):
542 fspolicy
= self
.pool_policy
.get(filesystem
, None)
544 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
545 peers
= self
.get_filesystem_peers(filesystem
)
547 for peer_uuid
, rem
in peers
.items():
548 conf
= self
.config_get(FSSnapshotMirror
.peer_config_key(filesystem
, peer_uuid
))
549 remote
= rem
['remote']
550 peer_res
[peer_uuid
] = {'client_name': remote
['client_name'],
551 'site_name': remote
['cluster_name'],
552 'fs_name': remote
['fs_name']
554 if 'mon_host' in conf
:
555 peer_res
[peer_uuid
]['mon_host'] = conf
['mon_host']
556 return 0, json
.dumps(peer_res
), ''
557 except MirrorException
as me
:
558 return me
.args
[0], '', me
.args
[1]
559 except Exception as e
:
560 return e
.args
[0], '', 'failed to list peers'
562 def peer_add(self
, filesystem
, remote_cluster_spec
, remote_fs_name
, remote_conf
):
564 if remote_fs_name
== None:
565 remote_fs_name
= filesystem
567 fspolicy
= self
.pool_policy
.get(filesystem
, None)
569 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
570 ### peer updates for key, site-name are not yet supported
571 if self
.peer_exists(filesystem
, remote_cluster_spec
, remote_fs_name
):
572 return 0, json
.dumps({}), ''
574 self
.verify_and_set_mirror_info(filesystem
, remote_cluster_spec
, remote_fs_name
, remote_conf
)
576 peer_uuid
= str(uuid
.uuid4())
577 config_key
= FSSnapshotMirror
.peer_config_key(filesystem
, peer_uuid
)
578 if remote_conf
.get('mon_host') and remote_conf
.get('key'):
579 self
.config_set(config_key
, json
.dumps(remote_conf
))
580 cmd
= {'prefix': 'fs mirror peer_add',
581 'fs_name': filesystem
,
583 'remote_cluster_spec': remote_cluster_spec
,
584 'remote_fs_name': remote_fs_name
}
585 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
587 log
.error(f
'mon command to add peer failed: {err}')
589 log
.debug(f
'cleaning up config-key for {peer_uuid}')
590 self
.config_set(config_key
)
593 raise Exception(-errno
.EINVAL
)
594 return 0, json
.dumps({}), ''
595 except MirrorException
as me
:
596 return me
.args
[0], '', me
.args
[1]
597 except Exception as e
:
598 return e
.args
[0], '', 'failed to add peer'
600 def peer_remove(self
, filesystem
, peer_uuid
):
603 fspolicy
= self
.pool_policy
.get(filesystem
, None)
605 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
606 # ok, this is being a bit lazy. remove mirror info from peer followed
607 # by purging the peer from fsmap. if the mirror daemon fs map updates
608 # are laggy, they happily continue to synchronize. ideally, we should
609 # purge the peer from fsmap here and purge mirror info on fsmap update
610 # (in notify()). but thats not straightforward -- before purging mirror
611 # info, we would need to wait for all mirror daemons to catch up with
612 # fsmap updates. this involves mirror daemons sending the fsmap epoch
613 # they have seen in reply to a notify request. TODO: fix this.
614 self
.purge_mirror_info(filesystem
, peer_uuid
)
615 cmd
= {'prefix': 'fs mirror peer_remove',
616 'fs_name': filesystem
,
618 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
620 log
.error(f
'mon command to remove peer failed: {err}')
621 raise Exception(-errno
.EINVAL
)
622 self
.config_set(FSSnapshotMirror
.peer_config_key(filesystem
, peer_uuid
))
623 return 0, json
.dumps({}), ''
624 except MirrorException
as me
:
625 return me
.args
[0], '', me
.args
[1]
626 except Exception as e
:
627 return e
.args
[0], '', 'failed to remove peer'
629 def peer_bootstrap_create(self
, fs_name
, client_name
, site_name
):
630 """create a bootstrap token for this peer filesystem"""
633 cmd
= {'prefix': 'fs authorize',
634 'filesystem': fs_name
,
635 'entity': client_name
,
636 'caps': ['/', 'rwps']}
637 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
639 log
.error(f
'mon command to create peer user failed: {err}')
640 raise Exception(-errno
.EINVAL
)
641 cmd
= {'prefix': 'auth get',
642 'entity': client_name
,
644 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
646 log
.error(f
'mon command to fetch keyring failed: {err}')
647 raise Exception(-errno
.EINVAL
)
648 outs
= json
.loads(outs
)
650 token_dct
= {'fsid': self
.mgr
.rados
.get_fsid(),
651 'filesystem': fs_name
,
652 'user': outs0
['entity'],
653 'site_name': site_name
,
655 'mon_host': self
.mgr
.rados
.conf_get('mon_host')}
656 token_str
= json
.dumps(token_dct
).encode('utf-8')
657 encoded_token
= base64
.b64encode(token_str
)
658 return 0, json
.dumps({'token': encoded_token
.decode('utf-8')}), ''
659 except MirrorException
as me
:
660 return me
.args
[0], '', me
.args
[1]
661 except Exception as e
:
662 return e
.args
[0], '', 'failed to bootstrap peer'
664 def peer_bootstrap_import(self
, filesystem
, token
):
666 token_str
= base64
.b64decode(token
)
667 token_dct
= json
.loads(token_str
.decode('utf-8'))
669 return -errno
.EINVAL
, '', 'failed to parse token'
670 client_name
= token_dct
.pop('user')
671 cluster_name
= token_dct
.pop('site_name')
672 remote_fs_name
= token_dct
.pop('filesystem')
673 remote_cluster_spec
= f
'{client_name}@{cluster_name}'
674 return self
.peer_add(filesystem
, remote_cluster_spec
, remote_fs_name
, token_dct
)
677 def norm_path(dir_path
):
678 if not os
.path
.isabs(dir_path
):
679 raise MirrorException(-errno
.EINVAL
, f
'{dir_path} should be an absolute path')
680 return os
.path
.normpath(dir_path
)
682 def add_dir(self
, filesystem
, dir_path
):
685 if not self
.filesystem_exist(filesystem
):
686 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
687 fspolicy
= self
.pool_policy
.get(filesystem
, None)
689 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
690 dir_path
= FSSnapshotMirror
.norm_path(dir_path
)
691 log
.debug(f
'path normalized to {dir_path}')
692 fspolicy
.add_dir(dir_path
)
693 return 0, json
.dumps({}), ''
694 except MirrorException
as me
:
695 return me
.args
[0], '', me
.args
[1]
696 except Exception as e
:
697 return e
.args
[0], '', 'failed to add directory'
699 def remove_dir(self
, filesystem
, dir_path
):
702 if not self
.filesystem_exist(filesystem
):
703 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
704 fspolicy
= self
.pool_policy
.get(filesystem
, None)
706 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
707 dir_path
= FSSnapshotMirror
.norm_path(dir_path
)
708 fspolicy
.remove_dir(dir_path
)
709 return 0, json
.dumps({}), ''
710 except MirrorException
as me
:
711 return me
.args
[0], '', me
.args
[1]
712 except Exception as e
:
713 return e
.args
[0], '', 'failed to remove directory'
715 def status(self
,filesystem
, dir_path
):
718 if not self
.filesystem_exist(filesystem
):
719 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
720 fspolicy
= self
.pool_policy
.get(filesystem
, None)
722 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
723 dir_path
= FSSnapshotMirror
.norm_path(dir_path
)
724 return fspolicy
.status(dir_path
)
725 except MirrorException
as me
:
726 return me
.args
[0], '', me
.args
[1]
728 def show_distribution(self
, filesystem
):
731 if not self
.filesystem_exist(filesystem
):
732 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
733 fspolicy
= self
.pool_policy
.get(filesystem
, None)
735 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
736 return fspolicy
.summary()
737 except MirrorException
as me
:
738 return me
.args
[0], '', me
.args
[1]
740 def daemon_status(self
, filesystem
):
743 if not self
.filesystem_exist(filesystem
):
744 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
745 fspolicy
= self
.pool_policy
.get(filesystem
, None)
747 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
749 sm
= self
.mgr
.get('service_map')
750 daemon_entry
= sm
['services'].get('cephfs-mirror', None)
752 for daemon_key
in daemon_entry
['daemons']:
754 daemon_id
= int(daemon_key
)
755 daemon_status
= self
.mgr
.get_daemon_status('cephfs-mirror', daemon_key
)
756 if not daemon_status
:
757 # temporary, should get updated soon
758 log
.debug(f
'daemon status not yet availble for daemon_id {daemon_id}')
761 daemons
[daemon_id
] = json
.loads(daemon_status
['status_json'])
763 # temporary, should get updated soon
764 log
.debug(f
'daemon status not yet available for daemon_id {daemon_id}')
767 return 0, json
.dumps(daemons
), ''
768 except MirrorException
as me
:
769 return me
.args
[0], '', me
.args
[1]