11 from typing
import Dict
, Any
16 from mgr_util
import RTimer
, CephfsClient
, open_filesystem
,\
17 CephfsConnectionException
18 from mgr_module
import NotifyType
19 from .blocklist
import blocklist
20 from .notify
import Notifier
, InstanceWatcher
21 from .utils
import INSTANCE_ID_PREFIX
, MIRROR_OBJECT_NAME
, Finisher
, \
22 AsyncOpTracker
, connect_to_filesystem
, disconnect_from_filesystem
23 from .exception
import MirrorException
24 from .dir_map
.create
import create_mirror_object
25 from .dir_map
.load
import load_dir_map
, load_instances
26 from .dir_map
.update
import UpdateDirMapRequest
, UpdateInstanceRequest
27 from .dir_map
.policy
import Policy
28 from .dir_map
.state_transition
import ActionType
30 log
= logging
.getLogger(__name__
)
32 CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL
= 1
35 class InstanceListener(InstanceWatcher
.Listener
):
36 def __init__(self
, fspolicy
):
37 self
.fspolicy
= fspolicy
39 def handle_instances(self
, added
, removed
):
40 self
.fspolicy
.update_instances(added
, removed
)
42 def __init__(self
, mgr
, ioctx
):
46 self
.policy
= Policy()
47 self
.lock
= threading
.Lock()
48 self
.cond
= threading
.Condition(self
.lock
)
50 self
.async_requests
= {}
51 self
.finisher
= Finisher()
52 self
.op_tracker
= AsyncOpTracker()
53 self
.notifier
= Notifier(ioctx
)
54 self
.instance_listener
= FSPolicy
.InstanceListener(self
)
55 self
.instance_watcher
= None
56 self
.stopping
= threading
.Event()
57 self
.timer_task
= RTimer(CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL
,
59 self
.timer_task
.start()
61 def schedule_action(self
, dir_paths
):
62 self
.dir_paths
.extend(dir_paths
)
64 def init(self
, dir_mapping
, instances
):
66 self
.policy
.init(dir_mapping
)
67 # we'll schedule action for all directories, so don't bother capturing
68 # directory names here.
69 self
.policy
.add_instances(list(instances
.keys()), initial_update
=True)
70 self
.instance_watcher
= InstanceWatcher(self
.ioctx
, instances
,
71 self
.instance_listener
)
72 self
.schedule_action(list(dir_mapping
.keys()))
76 log
.debug('FSPolicy.shutdown')
78 log
.debug('canceling update timer task')
79 self
.timer_task
.cancel()
80 log
.debug('update timer task canceled')
81 if self
.instance_watcher
:
82 log
.debug('stopping instance watcher')
83 self
.instance_watcher
.wait_and_stop()
84 log
.debug('stopping instance watcher')
85 self
.op_tracker
.wait_for_ops()
86 log
.debug('FSPolicy.shutdown done')
88 def handle_update_mapping(self
, updates
, removals
, request_id
, callback
, r
):
89 log
.info(f
'handle_update_mapping: {updates} {removals} {request_id} {callback} {r}')
92 self
.async_requests
.pop(request_id
)
94 callback(updates
, removals
, r
)
96 self
.op_tracker
.finish_async_op()
98 def handle_update_instances(self
, instances_added
, instances_removed
, request_id
, r
):
99 log
.info(f
'handle_update_instances: {instances_added} {instances_removed} {request_id} {r}')
102 self
.async_requests
.pop(request_id
)
103 if self
.stopping
.is_set():
104 log
.debug(f
'handle_update_instances: policy shutting down')
107 if instances_removed
:
108 schedules
.extend(self
.policy
.remove_instances(instances_removed
))
110 schedules
.extend(self
.policy
.add_instances(instances_added
))
111 self
.schedule_action(schedules
)
113 self
.op_tracker
.finish_async_op()
115 def update_mapping(self
, update_map
, removals
, callback
=None):
116 log
.info(f
'updating directory map: {len(update_map)}+{len(removals)} updates')
117 request_id
= str(uuid
.uuid4())
118 def async_callback(r
):
119 self
.finisher
.queue(self
.handle_update_mapping
,
120 [list(update_map
.keys()), removals
, request_id
, callback
, r
])
121 request
= UpdateDirMapRequest(self
.ioctx
, update_map
.copy(), removals
.copy(), async_callback
)
122 self
.async_requests
[request_id
] = request
123 self
.op_tracker
.start_async_op()
124 log
.debug(f
'async request_id: {request_id}')
127 def update_instances(self
, added
, removed
):
128 logging
.debug(f
'update_instances: added={added}, removed={removed}')
129 for instance_id
, addr
in removed
.items():
130 log
.info(f
'blocklisting instance_id: {instance_id} addr: {addr}')
131 blocklist(self
.mgr
, addr
)
134 instances_removed
= []
135 for instance_id
, addr
in added
.items():
136 instances_added
[instance_id
] = {'version': 1, 'addr': addr
}
137 instances_removed
= list(removed
.keys())
138 request_id
= str(uuid
.uuid4())
139 def async_callback(r
):
140 self
.finisher
.queue(self
.handle_update_instances
,
141 [list(instances_added
.keys()), instances_removed
, request_id
, r
])
142 # blacklisted instances can be removed at this point. remapping directories
143 # mapped to blacklisted instances on module startup is handled in policy
145 request
= UpdateInstanceRequest(self
.ioctx
, instances_added
.copy(),
146 instances_removed
.copy(), async_callback
)
147 self
.async_requests
[request_id
] = request
148 log
.debug(f
'async request_id: {request_id}')
149 self
.op_tracker
.start_async_op()
152 def continue_action(self
, updates
, removals
, r
):
153 log
.debug(f
'continuing action: {updates}+{removals} r={r}')
154 if self
.stopping
.is_set():
155 log
.debug('continue_action: policy shutting down')
158 for dir_path
in updates
:
159 schedule
= self
.policy
.finish_action(dir_path
, r
)
161 schedules
.append(dir_path
)
162 for dir_path
in removals
:
163 schedule
= self
.policy
.finish_action(dir_path
, r
)
165 schedules
.append(dir_path
)
166 self
.schedule_action(schedules
)
168 def handle_peer_ack(self
, dir_path
, r
):
169 log
.info(f
'handle_peer_ack: {dir_path} r={r}')
172 if self
.stopping
.is_set():
173 log
.debug(f
'handle_peer_ack: policy shutting down')
175 self
.continue_action([dir_path
], [], r
)
177 self
.op_tracker
.finish_async_op()
179 def process_updates(self
):
180 def acquire_message(dir_path
):
181 return json
.dumps({'dir_path': dir_path
,
184 def release_message(dir_path
):
185 return json
.dumps({'dir_path': dir_path
,
189 if not self
.dir_paths
or self
.stopping
.is_set():
195 for dir_path
in self
.dir_paths
:
196 action_type
= self
.policy
.start_action(dir_path
)
197 lookup_info
= self
.policy
.lookup(dir_path
)
198 log
.debug(f
'processing action: dir_path: {dir_path}, lookup_info: {lookup_info}, action_type: {action_type}')
199 if action_type
== ActionType
.NONE
:
201 elif action_type
== ActionType
.MAP_UPDATE
:
202 # take care to not overwrite purge status
203 update_map
[dir_path
] = {'version': 1,
204 'instance_id': lookup_info
['instance_id'],
205 'last_shuffled': lookup_info
['mapped_time']
207 if lookup_info
['purging']:
208 update_map
[dir_path
]['purging'] = 1
209 elif action_type
== ActionType
.MAP_REMOVE
:
210 removals
.append(dir_path
)
211 elif action_type
== ActionType
.ACQUIRE
:
212 notifies
[dir_path
] = (lookup_info
['instance_id'], acquire_message(dir_path
))
213 elif action_type
== ActionType
.RELEASE
:
214 notifies
[dir_path
] = (lookup_info
['instance_id'], release_message(dir_path
))
215 if update_map
or removals
:
216 self
.update_mapping(update_map
, removals
, callback
=self
.continue_action
)
217 for dir_path
, message
in notifies
.items():
218 self
.op_tracker
.start_async_op()
219 self
.notifier
.notify(dir_path
, message
, self
.handle_peer_ack
)
220 self
.dir_paths
.clear()
222 def add_dir(self
, dir_path
):
224 lookup_info
= self
.policy
.lookup(dir_path
)
226 if lookup_info
['purging']:
227 raise MirrorException(-errno
.EAGAIN
, f
'remove in-progress for {dir_path}')
229 raise MirrorException(-errno
.EEXIST
, f
'directory {dir_path} is already tracked')
230 schedule
= self
.policy
.add_dir(dir_path
)
233 update_map
= {dir_path
: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}}
235 def update_safe(updates
, removals
, r
):
238 self
.cond
.notifyAll()
239 self
.update_mapping(update_map
, [], callback
=update_safe
)
240 self
.cond
.wait_for(lambda: updated
)
241 self
.schedule_action([dir_path
])
243 def remove_dir(self
, dir_path
):
245 lookup_info
= self
.policy
.lookup(dir_path
)
247 raise MirrorException(-errno
.ENOENT
, f
'directory {dir_path} id not tracked')
248 if lookup_info
['purging']:
249 raise MirrorException(-errno
.EINVAL
, f
'directory {dir_path} is under removal')
250 update_map
= {dir_path
: {'version': 1,
251 'instance_id': lookup_info
['instance_id'],
252 'last_shuffled': lookup_info
['mapped_time'],
255 sync_lock
= threading
.Lock()
256 sync_cond
= threading
.Condition(sync_lock
)
261 sync_cond
.notifyAll()
262 request
= UpdateDirMapRequest(self
.ioctx
, update_map
.copy(), [], update_safe
)
265 sync_cond
.wait_for(lambda: updated
)
266 schedule
= self
.policy
.remove_dir(dir_path
)
268 self
.schedule_action([dir_path
])
270 def status(self
, dir_path
):
272 res
= self
.policy
.dir_status(dir_path
)
273 return 0, json
.dumps(res
, indent
=4, sort_keys
=True), ''
277 res
= self
.policy
.instance_summary()
278 return 0, json
.dumps(res
, indent
=4, sort_keys
=True), ''
280 class FSSnapshotMirror
:
281 PEER_CONFIG_KEY_PREFIX
= "cephfs/mirror/peer"
283 def __init__(self
, mgr
):
285 self
.rados
= mgr
.rados
286 self
.pool_policy
= {}
287 self
.fs_map
= self
.mgr
.get('fs_map')
288 self
.lock
= threading
.Lock()
289 self
.refresh_pool_policy()
290 self
.local_fs
= CephfsClient(mgr
)
292 def notify(self
, notify_type
: NotifyType
):
293 log
.debug(f
'got notify type {notify_type}')
294 if notify_type
== NotifyType
.fs_map
:
296 self
.fs_map
= self
.mgr
.get('fs_map')
297 self
.refresh_pool_policy_locked()
300 def make_spec(client_name
, cluster_name
):
301 return f
'{client_name}@{cluster_name}'
304 def split_spec(spec
):
306 client_id
, cluster_name
= spec
.split('@')
307 _
, client_name
= client_id
.split('.')
308 return client_name
, cluster_name
310 raise MirrorException(-errno
.EINVAL
, f
'invalid cluster spec {spec}')
313 def get_metadata_pool(filesystem
, fs_map
):
314 for fs
in fs_map
['filesystems']:
315 if fs
['mdsmap']['fs_name'] == filesystem
:
316 return fs
['mdsmap']['metadata_pool']
320 def get_filesystem_id(filesystem
, fs_map
):
321 for fs
in fs_map
['filesystems']:
322 if fs
['mdsmap']['fs_name'] == filesystem
:
327 def peer_config_key(filesystem
, peer_uuid
):
328 return f
'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}'
330 def config_set(self
, key
, val
=None):
331 """set or remove a key from mon config store"""
333 cmd
= {'prefix': 'config-key set',
334 'key': key
, 'val': val
}
336 cmd
= {'prefix': 'config-key rm',
338 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
340 log
.error(f
'mon command to set/remove config-key {key} failed: {err}')
341 raise Exception(-errno
.EINVAL
)
343 def config_get(self
, key
):
344 """fetch a config key value from mon config store"""
345 cmd
= {'prefix': 'config-key get', 'key': key
}
346 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
347 if r
< 0 and not r
== -errno
.ENOENT
:
348 log
.error(f
'mon command to get config-key {key} failed: {err}')
349 raise Exception(-errno
.EINVAL
)
352 val
= json
.loads(outs
)
355 def filesystem_exist(self
, filesystem
):
356 for fs
in self
.fs_map
['filesystems']:
357 if fs
['mdsmap']['fs_name'] == filesystem
:
361 def get_mirrored_filesystems(self
):
362 return [fs
['mdsmap']['fs_name'] for fs
in self
.fs_map
['filesystems'] if fs
.get('mirror_info', None)]
364 def get_filesystem_peers(self
, filesystem
):
365 """To be used when mirroring in enabled for the filesystem"""
366 for fs
in self
.fs_map
['filesystems']:
367 if fs
['mdsmap']['fs_name'] == filesystem
:
368 return fs
['mirror_info']['peers']
371 def peer_exists(self
, filesystem
, remote_cluster_spec
, remote_fs_name
):
372 peers
= self
.get_filesystem_peers(filesystem
)
373 for _
, rem
in peers
.items():
374 remote
= rem
['remote']
375 spec
= FSSnapshotMirror
.make_spec(remote
['client_name'], remote
['cluster_name'])
376 if spec
== remote_cluster_spec
and remote
['fs_name'] == remote_fs_name
:
381 def get_mirror_info(fs
):
383 val
= fs
.getxattr('/', 'ceph.mirror.info')
384 match
= re
.search(r
'^cluster_id=([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}) fs_id=(\d+)$',
386 if match
and len(match
.groups()) == 2:
387 return {'cluster_id': match
.group(1),
388 'fs_id': int(match
.group(2))
390 raise MirrorException(-errno
.EINVAL
, 'invalid ceph.mirror.info value format')
391 except cephfs
.Error
as e
:
392 raise MirrorException(-e
.errno
, 'error fetching ceph.mirror.info xattr')
395 def set_mirror_info(local_cluster_id
, local_fsid
, remote_fs
):
396 log
.info(f
'setting {local_cluster_id}::{local_fsid} on remote')
398 remote_fs
.setxattr('/', 'ceph.mirror.info',
399 f
'cluster_id={local_cluster_id} fs_id={local_fsid}'.encode('utf-8'), os
.XATTR_CREATE
)
400 except cephfs
.Error
as e
:
401 if e
.errno
== errno
.EEXIST
:
403 mi
= FSSnapshotMirror
.get_mirror_info(remote_fs
)
404 cluster_id
= mi
['cluster_id']
406 if not (cluster_id
== local_cluster_id
and fs_id
== local_fsid
):
407 raise MirrorException(-errno
.EEXIST
, f
'peer mirrorred by: (cluster_id: {cluster_id}, fs_id: {fs_id})')
408 except MirrorException
:
409 # if mirror info cannot be fetched for some reason, let's just
411 raise MirrorException(-errno
.EEXIST
, f
'already an active peer')
413 log
.error(f
'error setting mirrored fsid: {e}')
414 raise Exception(-e
.errno
)
416 def resolve_peer(self
, fs_name
, peer_uuid
):
417 peers
= self
.get_filesystem_peers(fs_name
)
418 for peer
, rem
in peers
.items():
419 if peer
== peer_uuid
:
423 def purge_mirror_info(self
, local_fs_name
, peer_uuid
):
424 log
.debug(f
'local fs={local_fs_name} peer_uuid={peer_uuid}')
425 # resolve the peer to its spec
426 rem
= self
.resolve_peer(local_fs_name
, peer_uuid
)
429 log
.debug(f
'peer_uuid={peer_uuid} resolved to {rem}')
430 _
, client_name
= rem
['client_name'].split('.')
432 # fetch auth details from config store
433 remote_conf
= self
.config_get(FSSnapshotMirror
.peer_config_key(local_fs_name
, peer_uuid
))
434 remote_cluster
, remote_fs
= connect_to_filesystem(client_name
,
436 rem
['fs_name'], 'remote', conf_dct
=remote_conf
)
438 remote_fs
.removexattr('/', 'ceph.mirror.info')
439 except cephfs
.Error
as e
:
440 if not e
.errno
== errno
.ENOENT
:
441 log
.error('error removing mirror info')
442 raise Exception(-e
.errno
)
444 disconnect_from_filesystem(rem
['cluster_name'], rem
['fs_name'], remote_cluster
, remote_fs
)
446 def verify_and_set_mirror_info(self
, local_fs_name
, remote_cluster_spec
, remote_fs_name
, remote_conf
={}):
447 log
.debug(f
'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}')
449 client_name
, cluster_name
= FSSnapshotMirror
.split_spec(remote_cluster_spec
)
450 remote_cluster
, remote_fs
= connect_to_filesystem(client_name
, cluster_name
, remote_fs_name
,
451 'remote', conf_dct
=remote_conf
)
453 local_cluster_id
= self
.rados
.get_fsid()
454 remote_cluster_id
= remote_cluster
.get_fsid()
455 log
.debug(f
'local_cluster_id={local_cluster_id} remote_cluster_id={remote_cluster_id}')
456 if 'fsid' in remote_conf
:
457 if not remote_cluster_id
== remote_conf
['fsid']:
458 raise MirrorException(-errno
.EINVAL
, 'FSID mismatch between bootstrap token and remote cluster')
460 local_fscid
= remote_fscid
= None
461 with
open_filesystem(self
.local_fs
, local_fs_name
) as local_fsh
:
462 local_fscid
= local_fsh
.get_fscid()
463 remote_fscid
= remote_fs
.get_fscid()
464 log
.debug(f
'local_fscid={local_fscid} remote_fscid={remote_fscid}')
467 mi
= FSSnapshotMirror
.get_mirror_info(local_fsh
)
468 except MirrorException
as me
:
469 if me
.args
[0] != -errno
.ENODATA
:
470 raise Exception(-errno
.EINVAL
)
471 if mi
and mi
['cluster_id'] == remote_cluster_id
and mi
['fs_id'] == remote_fscid
:
472 raise MirrorException(-errno
.EINVAL
, f
'file system is an active peer for file system: {remote_fs_name}')
474 if local_cluster_id
== remote_cluster_id
and local_fscid
== remote_fscid
:
475 raise MirrorException(-errno
.EINVAL
, "'Source and destination cluster fsid and "\
476 "file-system name can't be the same")
477 FSSnapshotMirror
.set_mirror_info(local_cluster_id
, local_fscid
, remote_fs
)
479 disconnect_from_filesystem(cluster_name
, remote_fs_name
, remote_cluster
, remote_fs
)
481 def init_pool_policy(self
, filesystem
):
482 metadata_pool_id
= FSSnapshotMirror
.get_metadata_pool(filesystem
, self
.fs_map
)
483 if not metadata_pool_id
:
484 log
.error(f
'cannot find metadata pool-id for filesystem {filesystem}')
485 raise Exception(-errno
.EINVAL
)
487 ioctx
= self
.rados
.open_ioctx2(metadata_pool_id
)
488 # TODO: make async if required
489 dir_mapping
= load_dir_map(ioctx
)
490 instances
= load_instances(ioctx
)
492 fspolicy
= FSPolicy(self
.mgr
, ioctx
)
493 log
.debug(f
'init policy for filesystem {filesystem}: pool-id {metadata_pool_id}')
494 fspolicy
.init(dir_mapping
, instances
)
495 self
.pool_policy
[filesystem
] = fspolicy
496 except rados
.Error
as e
:
497 log
.error(f
'failed to access pool-id {metadata_pool_id} for filesystem {filesystem}: {e}')
498 raise Exception(-e
.errno
)
500 def refresh_pool_policy_locked(self
):
501 filesystems
= self
.get_mirrored_filesystems()
502 log
.debug(f
'refreshing policy for {filesystems}')
503 for filesystem
in list(self
.pool_policy
):
504 if not filesystem
in filesystems
:
505 log
.info(f
'shutdown pool policy for {filesystem}')
506 fspolicy
= self
.pool_policy
.pop(filesystem
)
508 for filesystem
in filesystems
:
509 if not filesystem
in self
.pool_policy
:
510 log
.info(f
'init pool policy for {filesystem}')
511 self
.init_pool_policy(filesystem
)
513 def refresh_pool_policy(self
):
515 self
.refresh_pool_policy_locked()
517 def enable_mirror(self
, filesystem
):
518 log
.info(f
'enabling mirror for filesystem {filesystem}')
521 metadata_pool_id
= FSSnapshotMirror
.get_metadata_pool(filesystem
, self
.fs_map
)
522 if not metadata_pool_id
:
523 log
.error(f
'cannot find metadata pool-id for filesystem {filesystem}')
524 raise Exception(-errno
.EINVAL
)
525 create_mirror_object(self
.rados
, metadata_pool_id
)
526 cmd
= {'prefix': 'fs mirror enable', 'fs_name': filesystem
}
527 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
529 log
.error(f
'mon command to enable mirror failed: {err}')
530 raise Exception(-errno
.EINVAL
)
531 return 0, json
.dumps({}), ''
532 except MirrorException
as me
:
533 return me
.args
[0], '', me
.args
[1]
534 except Exception as me
:
535 return me
.args
[0], '', 'failed to enable mirroring'
537 def disable_mirror(self
, filesystem
):
538 log
.info(f
'disabling mirror for filesystem {filesystem}')
541 cmd
= {'prefix': 'fs mirror disable', 'fs_name': filesystem
}
542 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
544 log
.error(f
'mon command to disable mirror failed: {err}')
545 raise Exception(-errno
.EINVAL
)
546 return 0, json
.dumps({}), ''
547 except MirrorException
as me
:
548 return me
.args
[0], '', me
.args
[1]
549 except Exception as e
:
550 return e
.args
[0], '', 'failed to disable mirroring'
552 def peer_list(self
, filesystem
):
555 fspolicy
= self
.pool_policy
.get(filesystem
, None)
557 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
558 peers
= self
.get_filesystem_peers(filesystem
)
560 for peer_uuid
, rem
in peers
.items():
561 conf
= self
.config_get(FSSnapshotMirror
.peer_config_key(filesystem
, peer_uuid
))
562 remote
= rem
['remote']
563 peer_res
[peer_uuid
] = {'client_name': remote
['client_name'],
564 'site_name': remote
['cluster_name'],
565 'fs_name': remote
['fs_name']
567 if 'mon_host' in conf
:
568 peer_res
[peer_uuid
]['mon_host'] = conf
['mon_host']
569 return 0, json
.dumps(peer_res
), ''
570 except MirrorException
as me
:
571 return me
.args
[0], '', me
.args
[1]
572 except Exception as e
:
573 return e
.args
[0], '', 'failed to list peers'
575 def peer_add(self
, filesystem
, remote_cluster_spec
, remote_fs_name
, remote_conf
):
577 if remote_fs_name
== None:
578 remote_fs_name
= filesystem
580 fspolicy
= self
.pool_policy
.get(filesystem
, None)
582 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
583 ### peer updates for key, site-name are not yet supported
584 if self
.peer_exists(filesystem
, remote_cluster_spec
, remote_fs_name
):
585 return 0, json
.dumps({}), ''
587 self
.verify_and_set_mirror_info(filesystem
, remote_cluster_spec
, remote_fs_name
, remote_conf
)
589 peer_uuid
= str(uuid
.uuid4())
590 config_key
= FSSnapshotMirror
.peer_config_key(filesystem
, peer_uuid
)
591 if remote_conf
.get('mon_host') and remote_conf
.get('key'):
592 self
.config_set(config_key
, json
.dumps(remote_conf
))
593 cmd
= {'prefix': 'fs mirror peer_add',
594 'fs_name': filesystem
,
596 'remote_cluster_spec': remote_cluster_spec
,
597 'remote_fs_name': remote_fs_name
}
598 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
600 log
.error(f
'mon command to add peer failed: {err}')
602 log
.debug(f
'cleaning up config-key for {peer_uuid}')
603 self
.config_set(config_key
)
606 raise Exception(-errno
.EINVAL
)
607 return 0, json
.dumps({}), ''
608 except MirrorException
as me
:
609 return me
.args
[0], '', me
.args
[1]
610 except Exception as e
:
611 return e
.args
[0], '', 'failed to add peer'
613 def peer_remove(self
, filesystem
, peer_uuid
):
616 fspolicy
= self
.pool_policy
.get(filesystem
, None)
618 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
619 # ok, this is being a bit lazy. remove mirror info from peer followed
620 # by purging the peer from fsmap. if the mirror daemon fs map updates
621 # are laggy, they happily continue to synchronize. ideally, we should
622 # purge the peer from fsmap here and purge mirror info on fsmap update
623 # (in notify()). but thats not straightforward -- before purging mirror
624 # info, we would need to wait for all mirror daemons to catch up with
625 # fsmap updates. this involves mirror daemons sending the fsmap epoch
626 # they have seen in reply to a notify request. TODO: fix this.
627 self
.purge_mirror_info(filesystem
, peer_uuid
)
628 cmd
= {'prefix': 'fs mirror peer_remove',
629 'fs_name': filesystem
,
631 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
633 log
.error(f
'mon command to remove peer failed: {err}')
634 raise Exception(-errno
.EINVAL
)
635 self
.config_set(FSSnapshotMirror
.peer_config_key(filesystem
, peer_uuid
))
636 return 0, json
.dumps({}), ''
637 except MirrorException
as me
:
638 return me
.args
[0], '', me
.args
[1]
639 except Exception as e
:
640 return e
.args
[0], '', 'failed to remove peer'
642 def peer_bootstrap_create(self
, fs_name
, client_name
, site_name
):
643 """create a bootstrap token for this peer filesystem"""
646 cmd
= {'prefix': 'fs authorize',
647 'filesystem': fs_name
,
648 'entity': client_name
,
649 'caps': ['/', 'rwps']}
650 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
652 log
.error(f
'mon command to create peer user failed: {err}')
653 raise Exception(-errno
.EINVAL
)
654 cmd
= {'prefix': 'auth get',
655 'entity': client_name
,
657 r
, outs
, err
= self
.mgr
.mon_command(cmd
)
659 log
.error(f
'mon command to fetch keyring failed: {err}')
660 raise Exception(-errno
.EINVAL
)
661 outs
= json
.loads(outs
)
663 token_dct
= {'fsid': self
.mgr
.rados
.get_fsid(),
664 'filesystem': fs_name
,
665 'user': outs0
['entity'],
666 'site_name': site_name
,
668 'mon_host': self
.mgr
.rados
.conf_get('mon_host')}
669 token_str
= json
.dumps(token_dct
).encode('utf-8')
670 encoded_token
= base64
.b64encode(token_str
)
671 return 0, json
.dumps({'token': encoded_token
.decode('utf-8')}), ''
672 except MirrorException
as me
:
673 return me
.args
[0], '', me
.args
[1]
674 except Exception as e
:
675 return e
.args
[0], '', 'failed to bootstrap peer'
677 def peer_bootstrap_import(self
, filesystem
, token
):
679 token_str
= base64
.b64decode(token
)
680 token_dct
= json
.loads(token_str
.decode('utf-8'))
682 return -errno
.EINVAL
, '', 'failed to parse token'
683 client_name
= token_dct
.pop('user')
684 cluster_name
= token_dct
.pop('site_name')
685 remote_fs_name
= token_dct
.pop('filesystem')
686 remote_cluster_spec
= f
'{client_name}@{cluster_name}'
687 return self
.peer_add(filesystem
, remote_cluster_spec
, remote_fs_name
, token_dct
)
690 def norm_path(dir_path
):
691 if not os
.path
.isabs(dir_path
):
692 raise MirrorException(-errno
.EINVAL
, f
'{dir_path} should be an absolute path')
693 return os
.path
.normpath(dir_path
)
695 def add_dir(self
, filesystem
, dir_path
):
698 if not self
.filesystem_exist(filesystem
):
699 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
700 fspolicy
= self
.pool_policy
.get(filesystem
, None)
702 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
703 dir_path
= FSSnapshotMirror
.norm_path(dir_path
)
704 log
.debug(f
'path normalized to {dir_path}')
705 fspolicy
.add_dir(dir_path
)
706 return 0, json
.dumps({}), ''
707 except MirrorException
as me
:
708 return me
.args
[0], '', me
.args
[1]
709 except Exception as e
:
710 return e
.args
[0], '', 'failed to add directory'
712 def remove_dir(self
, filesystem
, dir_path
):
715 if not self
.filesystem_exist(filesystem
):
716 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
717 fspolicy
= self
.pool_policy
.get(filesystem
, None)
719 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
720 dir_path
= FSSnapshotMirror
.norm_path(dir_path
)
721 fspolicy
.remove_dir(dir_path
)
722 return 0, json
.dumps({}), ''
723 except MirrorException
as me
:
724 return me
.args
[0], '', me
.args
[1]
725 except Exception as e
:
726 return e
.args
[0], '', 'failed to remove directory'
728 def status(self
,filesystem
, dir_path
):
731 if not self
.filesystem_exist(filesystem
):
732 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
733 fspolicy
= self
.pool_policy
.get(filesystem
, None)
735 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
736 dir_path
= FSSnapshotMirror
.norm_path(dir_path
)
737 return fspolicy
.status(dir_path
)
738 except MirrorException
as me
:
739 return me
.args
[0], '', me
.args
[1]
741 def show_distribution(self
, filesystem
):
744 if not self
.filesystem_exist(filesystem
):
745 raise MirrorException(-errno
.ENOENT
, f
'filesystem {filesystem} does not exist')
746 fspolicy
= self
.pool_policy
.get(filesystem
, None)
748 raise MirrorException(-errno
.EINVAL
, f
'filesystem {filesystem} is not mirrored')
749 return fspolicy
.summary()
750 except MirrorException
as me
:
751 return me
.args
[0], '', me
.args
[1]
753 def daemon_status(self
):
757 sm
= self
.mgr
.get('service_map')
758 daemon_entry
= sm
['services'].get('cephfs-mirror', None)
759 log
.debug(f
'daemon_entry: {daemon_entry}')
760 if daemon_entry
is not None:
761 for daemon_key
in daemon_entry
.get('daemons', []):
763 daemon_id
= int(daemon_key
)
767 'daemon_id' : daemon_id
,
769 } # type: Dict[str, Any]
770 daemon_status
= self
.mgr
.get_daemon_status('cephfs-mirror', daemon_key
)
771 if not daemon_status
:
772 log
.debug(f
'daemon status not yet availble for cephfs-mirror daemon: {daemon_key}')
774 status
= json
.loads(daemon_status
['status_json'])
775 for fs_id
, fs_desc
in status
.items():
776 fs
= {'filesystem_id' : int(fs_id
),
777 'name' : fs_desc
['name'],
778 'directory_count' : fs_desc
['directory_count'],
780 } # type: Dict[str, Any]
781 for peer_uuid
, peer_desc
in fs_desc
['peers'].items():
784 'remote' : peer_desc
['remote'],
785 'stats' : peer_desc
['stats']
787 fs
['peers'].append(peer
)
788 daemon
['filesystems'].append(fs
)
789 daemons
.append(daemon
)
790 return 0, json
.dumps(daemons
), ''
791 except MirrorException
as me
:
792 return me
.args
[0], '', me
.args
[1]