1 from __future__
import print_function
5 from textwrap
import dedent
6 from ceph_volume
.util
import system
, disk
, merge_dict
7 from ceph_volume
.util
.device
import Device
8 from ceph_volume
.util
.arg_validators
import valid_osd_id
9 from ceph_volume
.util
import encryption
as encryption_utils
10 from ceph_volume
import decorators
, terminal
, process
11 from ceph_volume
.api
import lvm
as api
12 from ceph_volume
.systemd
import systemctl
15 logger
= logging
.getLogger(__name__
)
16 mlogger
= terminal
.MultiLogger(__name__
)
18 def get_cluster_name(osd_id
, osd_fsid
):
20 From an ``osd_id`` and/or an ``osd_fsid``, filter out all the LVs in the
21 system that match those tag values, then return cluster_name for the first
25 lv_tags
['ceph.osd_id'] = osd_id
26 lv_tags
['ceph.osd_fsid'] = osd_fsid
28 lvs
= api
.get_lvs(tags
=lv_tags
)
31 'Unable to find any LV for source OSD: id:{} fsid:{}'.format(
33 raise SystemExit('Unexpected error, terminating')
34 return next(iter(lvs
)).tags
["ceph.cluster_name"]
36 def get_osd_path(osd_id
, osd_fsid
):
37 return '/var/lib/ceph/osd/{}-{}'.format(
38 get_cluster_name(osd_id
, osd_fsid
), osd_id
)
40 def find_associated_devices(osd_id
, osd_fsid
):
42 From an ``osd_id`` and/or an ``osd_fsid``, filter out all the LVs in the
43 system that match those tag values, further detect if any partitions are
44 part of the OSD, and then return the set of LVs and partitions (if any).
47 lv_tags
['ceph.osd_id'] = osd_id
48 lv_tags
['ceph.osd_fsid'] = osd_fsid
50 lvs
= api
.get_lvs(tags
=lv_tags
)
53 'Unable to find any LV for source OSD: id:{} fsid:{}'.format(
55 raise SystemExit('Unexpected error, terminating')
57 devices
= set(ensure_associated_lvs(lvs
, lv_tags
))
58 return [(Device(path
), type) for path
, type in devices
if path
]
60 def ensure_associated_lvs(lvs
, lv_tags
):
62 Go through each LV and ensure if backing devices (journal, wal, block)
63 are LVs or partitions, so that they can be accurately reported.
65 # look for many LVs for each backing type, because it is possible to
66 # receive a filtering for osd.1, and have multiple failed deployments
67 # leaving many journals with osd.1 - usually, only a single LV will be
70 block_lvs
= api
.get_lvs(tags
=merge_dict(lv_tags
, {'ceph.type': 'block'}))
71 db_lvs
= api
.get_lvs(tags
=merge_dict(lv_tags
, {'ceph.type': 'db'}))
72 wal_lvs
= api
.get_lvs(tags
=merge_dict(lv_tags
, {'ceph.type': 'wal'}))
73 backing_devices
= [(block_lvs
, 'block'), (db_lvs
, 'db'),
79 # go through each lv and append it, otherwise query `blkid` to find
80 # a physical device. Do this for each type (journal,db,wal) regardless
81 # if they have been processed in the previous LV, so that bad devices
82 # with the same ID can be caught
83 for ceph_lvs
, type in backing_devices
:
86 verified_devices
.extend([(l
.lv_path
, type) for l
in ceph_lvs
])
89 # must be a disk partition, by querying blkid by the uuid we are
90 # ensuring that the device path is always correct
92 device_uuid
= lv
.tags
['ceph.{}_uuid'.format(type)]
94 # Bluestore will not have ceph.journal_uuid, and Filestore
95 # will not not have ceph.db_uuid
98 osd_device
= disk
.get_device_from_partuuid(device_uuid
)
100 # if the osd_device is not found by the partuuid, then it is
101 # not possible to ensure this device exists anymore, so skip it
103 verified_devices
.append((osd_device
, type))
105 return verified_devices
107 class VolumeTagTracker(object):
108 def __init__(self
, devices
, target_lv
):
109 self
.target_lv
= target_lv
110 self
.data_device
= self
.db_device
= self
.wal_device
= None
111 for device
, type in devices
:
113 self
.data_device
= device
115 self
.db_device
= device
117 self
.wal_device
= device
118 if not self
.data_device
:
119 mlogger
.error('Data device not found')
121 "Unexpected error, terminating")
122 if not self
.data_device
.is_lv
:
123 mlogger
.error('Data device isn\'t LVM')
125 "Unexpected error, terminating")
127 self
.old_target_tags
= self
.target_lv
.tags
.copy()
128 self
.old_data_tags
= (
129 self
.data_device
.lv_api
.tags
.copy()
130 if self
.data_device
.is_lv
else None)
132 self
.db_device
.lv_api
.tags
.copy()
133 if self
.db_device
and self
.db_device
.is_lv
else None)
134 self
.old_wal_tags
= (
135 self
.wal_device
.lv_api
.tags
.copy()
136 if self
.wal_device
and self
.wal_device
.is_lv
else None)
138 def update_tags_when_lv_create(self
, create_type
):
140 if not self
.data_device
.is_lv
:
142 'Data device is not LVM, wouldn\'t update LVM tags')
144 tags
["ceph.{}_uuid".format(create_type
)] = self
.target_lv
.lv_uuid
145 tags
["ceph.{}_device".format(create_type
)] = self
.target_lv
.lv_path
146 self
.data_device
.lv_api
.set_tags(tags
)
148 tags
= self
.data_device
.lv_api
.tags
.copy()
149 tags
["ceph.type"] = create_type
150 self
.target_lv
.set_tags(tags
)
153 if create_type
== "db" and self
.wal_device
:
154 aux_dev
= self
.wal_device
155 elif create_type
== "wal" and self
.db_device
:
156 aux_dev
= self
.db_device
159 if not aux_dev
.is_lv
:
161 '{} device is not LVM, wouldn\'t update LVM tags'.format(
162 create_type
.upper()))
165 tags
["ceph.{}_uuid".format(create_type
)] = self
.target_lv
.lv_uuid
166 tags
["ceph.{}_device".format(create_type
)] = self
.target_lv
.lv_path
167 aux_dev
.lv_api
.set_tags(tags
)
169 def remove_lvs(self
, source_devices
, target_type
):
170 remaining_devices
= [self
.data_device
, self
.db_device
, self
.wal_device
]
173 for device
, type in source_devices
:
174 if type == "block" or type == target_type
:
176 remaining_devices
.remove(device
)
178 outdated_tags
.append("ceph.{}_uuid".format(type))
179 outdated_tags
.append("ceph.{}_device".format(type))
180 device
.lv_api
.clear_tags()
181 if len(outdated_tags
) > 0:
182 for d
in remaining_devices
:
184 d
.lv_api
.clear_tags(outdated_tags
)
186 def replace_lvs(self
, source_devices
, target_type
):
187 remaining_devices
= [self
.data_device
]
189 remaining_devices
.append(self
.db_device
)
191 remaining_devices
.append(self
.wal_device
)
194 for device
, type in source_devices
:
197 remaining_devices
.remove(device
)
199 outdated_tags
.append("ceph.{}_uuid".format(type))
200 outdated_tags
.append("ceph.{}_device".format(type))
201 device
.lv_api
.clear_tags()
204 new_tags
["ceph.{}_uuid".format(target_type
)] = self
.target_lv
.lv_uuid
205 new_tags
["ceph.{}_device".format(target_type
)] = self
.target_lv
.lv_path
207 for d
in remaining_devices
:
209 if len(outdated_tags
) > 0:
210 d
.lv_api
.clear_tags(outdated_tags
)
211 d
.lv_api
.set_tags(new_tags
)
213 if not self
.data_device
.is_lv
:
215 'Data device is not LVM, wouldn\'t properly update target LVM tags')
217 tags
= self
.data_device
.lv_api
.tags
.copy()
219 tags
["ceph.type"] = target_type
220 tags
["ceph.{}_uuid".format(target_type
)] = self
.target_lv
.lv_uuid
221 tags
["ceph.{}_device".format(target_type
)] = self
.target_lv
.lv_path
222 self
.target_lv
.set_tags(tags
)
226 'Undoing lv tag set')
228 if self
.old_data_tags
:
229 self
.data_device
.lv_api
.set_tags(self
.old_data_tags
)
231 self
.data_device
.lv_api
.clear_tags()
234 self
.db_device
.lv_api
.set_tags(self
.old_db_tags
)
236 self
.db_device
.lv_api
.clear_tags()
238 if self
.old_wal_tags
:
239 self
.wal_device
.lv_api
.set_tags(self
.old_wal_tags
)
241 self
.wal_device
.lv_api
.clear_tags()
242 if self
.old_target_tags
:
243 self
.target_lv
.set_tags(self
.old_target_tags
)
245 self
.target_lv
.clear_tags()
247 class Migrate(object):
249 help = 'Migrate BlueFS data from to another LVM device'
251 def __init__(self
, argv
):
255 def get_source_devices(self
, devices
, target_type
=""):
257 for device
, type in devices
:
258 if type == target_type
:
261 if 'data' not in self
.args
.from_
:
264 if 'db' not in self
.args
.from_
:
267 if 'wal' not in self
.args
.from_
:
269 ret
.append([device
, type])
271 mlogger
.error('Source device list is empty')
273 'Unable to migrate to : {}'.format(self
.args
.target
))
276 # ceph-bluestore-tool uses the following replacement rules
277 # (in the order of precedence, stop on the first match)
278 # if source list has DB volume - target device replaces it.
279 # if source list has WAL volume - target device replace it.
280 # if source list has slow volume only - operation isn't permitted,
281 # requires explicit allocation via new-db/new-wal command.detects which
282 def get_target_type_by_source(self
, devices
):
284 for device
, type in devices
:
291 def get_filename_by_type(self
, type):
293 if type == 'db' or type == 'wal':
294 filename
+= '.' + type
297 def get_source_args(self
, osd_path
, devices
):
299 for device
, type in devices
:
300 ret
= ret
+ ["--devs-source", os
.path
.join(
301 osd_path
, self
.get_filename_by_type(type))]
304 def close_encrypted(self
, source_devices
):
305 # close source device(-s) if they're encrypted and have been removed
306 for device
,type in source_devices
:
307 if (type == 'db' or type == 'wal'):
308 logger
.info("closing dmcrypt volume {}"
309 .format(device
.lv_api
.lv_uuid
))
310 encryption_utils
.dmcrypt_close(
311 mapping
= device
.lv_api
.lv_uuid
, skip_path_check
=True)
313 @decorators.needs_root
314 def migrate_to_new(self
, osd_id
, osd_fsid
, devices
, target_lv
):
315 source_devices
= self
.get_source_devices(devices
)
316 target_type
= self
.get_target_type_by_source(source_devices
)
319 "Unable to determine new volume type,"
320 " please use new-db or new-wal command before.")
322 "Unable to migrate to : {}".format(self
.args
.target
))
324 target_path
= target_lv
.lv_path
325 tag_tracker
= VolumeTagTracker(devices
, target_lv
)
326 # prepare and encrypt target if data volume is encrypted
327 if tag_tracker
.data_device
.lv_api
.encrypted
:
328 secret
= encryption_utils
.get_dmcrypt_key(osd_id
, osd_fsid
)
329 mlogger
.info(' preparing dmcrypt for {}, uuid {}'.format(target_lv
.lv_path
, target_lv
.lv_uuid
))
330 target_path
= encryption_utils
.prepare_dmcrypt(
331 key
=secret
, device
=target_path
, mapping
=target_lv
.lv_uuid
)
333 # we need to update lvm tags for all the remaining volumes
334 # and clear for ones which to be removed
336 # ceph-bluestore-tool removes source volume(s) other than block one
337 # and attaches target one after successful migration
338 tag_tracker
.replace_lvs(source_devices
, target_type
)
340 osd_path
= get_osd_path(osd_id
, osd_fsid
)
341 source_args
= self
.get_source_args(osd_path
, source_devices
)
342 mlogger
.info("Migrate to new, Source: {} Target: {}".format(
343 source_args
, target_path
))
344 stdout
, stderr
, exit_code
= process
.call([
345 'ceph-bluestore-tool',
351 'bluefs-bdev-migrate'] +
355 'Failed to migrate device, error code:{}'.format(exit_code
))
357 'Failed to migrate to : {}'.format(self
.args
.target
))
359 system
.chown(os
.path
.join(osd_path
, "block.{}".format(
361 if tag_tracker
.data_device
.lv_api
.encrypted
:
362 self
.close_encrypted(source_devices
)
363 terminal
.success('Migration successful.')
371 @decorators.needs_root
372 def migrate_to_existing(self
, osd_id
, osd_fsid
, devices
, target_lv
):
373 target_type
= target_lv
.tags
["ceph.type"]
374 if target_type
== "wal":
375 mlogger
.error("Migrate to WAL is not supported")
377 "Unable to migrate to : {}".format(self
.args
.target
))
378 target_filename
= self
.get_filename_by_type(target_type
)
379 if (target_filename
== ""):
381 "Target Logical Volume doesn't have proper volume type "
382 "(ceph.type LVM tag): {}".format(target_type
))
384 "Unable to migrate to : {}".format(self
.args
.target
))
386 osd_path
= get_osd_path(osd_id
, osd_fsid
)
387 source_devices
= self
.get_source_devices(devices
, target_type
)
388 target_path
= os
.path
.join(osd_path
, target_filename
)
389 tag_tracker
= VolumeTagTracker(devices
, target_lv
)
392 # ceph-bluestore-tool removes source volume(s) other than
393 # block and target ones after successful migration
394 tag_tracker
.remove_lvs(source_devices
, target_type
)
395 source_args
= self
.get_source_args(osd_path
, source_devices
)
396 mlogger
.info("Migrate to existing, Source: {} Target: {}".format(
397 source_args
, target_path
))
398 stdout
, stderr
, exit_code
= process
.call([
399 'ceph-bluestore-tool',
405 'bluefs-bdev-migrate'] +
409 'Failed to migrate device, error code:{}'.format(exit_code
))
411 'Failed to migrate to : {}'.format(self
.args
.target
))
412 if tag_tracker
.data_device
.lv_api
.encrypted
:
413 self
.close_encrypted(source_devices
)
414 terminal
.success('Migration successful.')
421 @decorators.needs_root
422 def migrate_osd(self
):
423 if self
.args
.osd_id
and not self
.args
.no_systemd
:
424 osd_is_running
= systemctl
.osd_is_active(self
.args
.osd_id
)
426 mlogger
.error('OSD is running, stop it with: '
427 'systemctl stop ceph-osd@{}'.format(
430 'Unable to migrate devices associated with OSD ID: {}'
431 .format(self
.args
.osd_id
))
433 target_lv
= api
.get_lv_by_fullname(self
.args
.target
)
436 'Target path "{}" is not a Logical Volume'.format(
439 'Unable to migrate to : {}'.format(self
.args
.target
))
440 devices
= find_associated_devices(self
.args
.osd_id
, self
.args
.osd_fsid
)
441 if (not target_lv
.used_by_ceph
):
442 self
.migrate_to_new(self
.args
.osd_id
, self
.args
.osd_fsid
,
446 if (target_lv
.tags
['ceph.osd_id'] != self
.args
.osd_id
or
447 target_lv
.tags
['ceph.osd_fsid'] != self
.args
.osd_fsid
):
449 'Target Logical Volume isn\'t used by the specified OSD: '
450 '{} FSID: {}'.format(self
.args
.osd_id
,
453 'Unable to migrate to : {}'.format(self
.args
.target
))
455 self
.migrate_to_existing(self
.args
.osd_id
, self
.args
.osd_fsid
,
459 def make_parser(self
, prog
, sub_command_help
):
460 parser
= argparse
.ArgumentParser(
462 formatter_class
=argparse
.RawDescriptionHelpFormatter
,
463 description
=sub_command_help
,
469 help='Specify an OSD ID to detect associated devices for zapping',
476 help='Specify an OSD FSID to detect associated devices for zapping',
481 help='Specify target Logical Volume (LV) to migrate data to',
488 choices
=['data', 'db', 'wal'],
489 help='Copy BlueFS data from DB device',
495 help='Skip checking OSD systemd unit',
500 sub_command_help
= dedent("""
501 Moves BlueFS data from source volume(s) to the target one, source
502 volumes (except the main (i.e. data or block) one) are removed on
503 success. LVM volumes are permitted for Target only, both already
504 attached or new logical one. In the latter case it is attached to OSD
505 replacing one of the source devices. Following replacement rules apply
506 (in the order of precedence, stop on the first match):
507 * if source list has DB volume - target device replaces it.
508 * if source list has WAL volume - target device replace it.
509 * if source list has slow volume only - operation is not permitted,
510 requires explicit allocation via new-db/new-wal command.
512 Example calls for supported scenarios:
514 Moves BlueFS data from main device to LV already attached as DB:
516 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data --target vgname/db
518 Moves BlueFS data from shared main device to LV which will be attached
521 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data --target vgname/new_db
523 Moves BlueFS data from DB device to new LV, DB is replaced:
525 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from db --target vgname/new_db
527 Moves BlueFS data from main and DB devices to new LV, DB is replaced:
529 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data db --target vgname/new_db
531 Moves BlueFS data from main, DB and WAL devices to new LV, WAL is
532 removed and DB is replaced:
534 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data db wal --target vgname/new_db
536 Moves BlueFS data from main, DB and WAL devices to main device, WAL
539 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from db wal --target vgname/data
543 parser
= self
.make_parser('ceph-volume lvm migrate', sub_command_help
)
545 if len(self
.argv
) == 0:
546 print(sub_command_help
)
549 self
.args
= parser
.parse_args(self
.argv
)
553 class NewVolume(object):
554 def __init__(self
, create_type
, argv
):
555 self
.create_type
= create_type
558 def make_parser(self
, prog
, sub_command_help
):
559 parser
= argparse
.ArgumentParser(
561 formatter_class
=argparse
.RawDescriptionHelpFormatter
,
562 description
=sub_command_help
,
568 help='Specify an OSD ID to attach new volume to',
575 help='Specify an OSD FSIDto attach new volume to',
580 help='Specify target Logical Volume (LV) to attach',
586 help='Skip checking OSD systemd unit',
590 @decorators.needs_root
591 def make_new_volume(self
, osd_id
, osd_fsid
, devices
, target_lv
):
592 osd_path
= get_osd_path(osd_id
, osd_fsid
)
594 'Making new volume at {} for OSD: {} ({})'.format(
595 target_lv
.lv_path
, osd_id
, osd_path
))
596 target_path
= target_lv
.lv_path
597 tag_tracker
= VolumeTagTracker(devices
, target_lv
)
598 # prepare and encrypt target if data volume is encrypted
599 if tag_tracker
.data_device
.lv_api
.encrypted
:
600 secret
= encryption_utils
.get_dmcrypt_key(osd_id
, osd_fsid
)
601 mlogger
.info(' preparing dmcrypt for {}, uuid {}'.format(target_lv
.lv_path
, target_lv
.lv_uuid
))
602 target_path
= encryption_utils
.prepare_dmcrypt(
603 key
=secret
, device
=target_path
, mapping
=target_lv
.lv_uuid
)
606 tag_tracker
.update_tags_when_lv_create(self
.create_type
)
608 stdout
, stderr
, exit_code
= process
.call([
609 'ceph-bluestore-tool',
615 'bluefs-bdev-new-{}'.format(self
.create_type
)
619 'failed to attach new volume, error code:{}'.format(
622 "Failed to attach new volume: {}".format(
625 system
.chown(os
.path
.join(osd_path
, "block.{}".format(
627 terminal
.success('New volume attached.')
633 @decorators.needs_root
634 def new_volume(self
):
635 if self
.args
.osd_id
and not self
.args
.no_systemd
:
636 osd_is_running
= systemctl
.osd_is_active(self
.args
.osd_id
)
638 mlogger
.error('OSD ID is running, stop it with:'
639 ' systemctl stop ceph-osd@{}'.format(self
.args
.osd_id
))
641 'Unable to attach new volume for OSD: {}'.format(
644 target_lv
= api
.get_lv_by_fullname(self
.args
.target
)
647 'Target path {} is not a Logical Volume'.format(
650 'Unable to attach new volume : {}'.format(self
.args
.target
))
651 if target_lv
.used_by_ceph
:
653 'Target Logical Volume is already used by ceph: {}'.format(
656 'Unable to attach new volume : {}'.format(self
.args
.target
))
658 devices
= find_associated_devices(self
.args
.osd_id
,
660 self
.make_new_volume(
666 class NewWAL(NewVolume
):
668 help = 'Allocate new WAL volume for OSD at specified Logical Volume'
670 def __init__(self
, argv
):
671 super(NewWAL
, self
).__init
__("wal", argv
)
674 sub_command_help
= dedent("""
675 Attaches the given logical volume to the given OSD as a WAL volume.
676 Logical volume format is vg/lv. Fails if OSD has already got attached DB.
680 Attach vgname/lvname as a WAL volume to OSD 1
682 ceph-volume lvm new-wal --osd-id 1 --osd-fsid 55BD4219-16A7-4037-BC20-0F158EFCC83D --target vgname/new_wal
684 parser
= self
.make_parser('ceph-volume lvm new-wal', sub_command_help
)
686 if len(self
.argv
) == 0:
687 print(sub_command_help
)
690 self
.args
= parser
.parse_args(self
.argv
)
694 class NewDB(NewVolume
):
696 help = 'Allocate new DB volume for OSD at specified Logical Volume'
698 def __init__(self
, argv
):
699 super(NewDB
, self
).__init
__("db", argv
)
702 sub_command_help
= dedent("""
703 Attaches the given logical volume to the given OSD as a DB volume.
704 Logical volume format is vg/lv. Fails if OSD has already got attached DB.
708 Attach vgname/lvname as a DB volume to OSD 1
710 ceph-volume lvm new-db --osd-id 1 --osd-fsid 55BD4219-16A7-4037-BC20-0F158EFCC83D --target vgname/new_db
713 parser
= self
.make_parser('ceph-volume lvm new-db', sub_command_help
)
714 if len(self
.argv
) == 0:
715 print(sub_command_help
)
717 self
.args
= parser
.parse_args(self
.argv
)