1 from __future__
import print_function
5 from textwrap
import dedent
6 from ceph_volume
.util
import system
, disk
, merge_dict
7 from ceph_volume
.util
.device
import Device
8 from ceph_volume
.util
.arg_validators
import valid_osd_id
9 from ceph_volume
import decorators
, terminal
, process
10 from ceph_volume
.api
import lvm
as api
11 from ceph_volume
.systemd
import systemctl
14 logger
= logging
.getLogger(__name__
)
15 mlogger
= terminal
.MultiLogger(__name__
)
17 def get_cluster_name(osd_id
, osd_fsid
):
19 From an ``osd_id`` and/or an ``osd_fsid``, filter out all the LVs in the
20 system that match those tag values, then return cluster_name for the first
24 lv_tags
['ceph.osd_id'] = osd_id
25 lv_tags
['ceph.osd_fsid'] = osd_fsid
27 lvs
= api
.get_lvs(tags
=lv_tags
)
30 'Unable to find any LV for source OSD: id:{} fsid:{}'.format(
32 raise SystemExit('Unexpected error, terminating')
33 return next(iter(lvs
)).tags
["ceph.cluster_name"]
35 def get_osd_path(osd_id
, osd_fsid
):
36 return '/var/lib/ceph/osd/{}-{}'.format(
37 get_cluster_name(osd_id
, osd_fsid
), osd_id
)
39 def find_associated_devices(osd_id
, osd_fsid
):
41 From an ``osd_id`` and/or an ``osd_fsid``, filter out all the LVs in the
42 system that match those tag values, further detect if any partitions are
43 part of the OSD, and then return the set of LVs and partitions (if any).
46 lv_tags
['ceph.osd_id'] = osd_id
47 lv_tags
['ceph.osd_fsid'] = osd_fsid
49 lvs
= api
.get_lvs(tags
=lv_tags
)
52 'Unable to find any LV for source OSD: id:{} fsid:{}'.format(
54 raise SystemExit('Unexpected error, terminating')
56 devices
= set(ensure_associated_lvs(lvs
, lv_tags
))
57 return [(Device(path
), type) for path
, type in devices
if path
]
59 def ensure_associated_lvs(lvs
, lv_tags
):
61 Go through each LV and ensure if backing devices (journal, wal, block)
62 are LVs or partitions, so that they can be accurately reported.
64 # look for many LVs for each backing type, because it is possible to
65 # receive a filtering for osd.1, and have multiple failed deployments
66 # leaving many journals with osd.1 - usually, only a single LV will be
69 block_lvs
= api
.get_lvs(tags
=merge_dict(lv_tags
, {'ceph.type': 'block'}))
70 db_lvs
= api
.get_lvs(tags
=merge_dict(lv_tags
, {'ceph.type': 'db'}))
71 wal_lvs
= api
.get_lvs(tags
=merge_dict(lv_tags
, {'ceph.type': 'wal'}))
72 backing_devices
= [(block_lvs
, 'block'), (db_lvs
, 'db'),
78 # go through each lv and append it, otherwise query `blkid` to find
79 # a physical device. Do this for each type (journal,db,wal) regardless
80 # if they have been processed in the previous LV, so that bad devices
81 # with the same ID can be caught
82 for ceph_lvs
, type in backing_devices
:
85 verified_devices
.extend([(l
.lv_path
, type) for l
in ceph_lvs
])
88 # must be a disk partition, by querying blkid by the uuid we are
89 # ensuring that the device path is always correct
91 device_uuid
= lv
.tags
['ceph.{}_uuid'.format(type)]
93 # Bluestore will not have ceph.journal_uuid, and Filestore
94 # will not not have ceph.db_uuid
97 osd_device
= disk
.get_device_from_partuuid(device_uuid
)
99 # if the osd_device is not found by the partuuid, then it is
100 # not possible to ensure this device exists anymore, so skip it
102 verified_devices
.append((osd_device
, type))
104 return verified_devices
106 class VolumeTagTracker(object):
107 def __init__(self
, devices
, target_lv
):
108 self
.target_lv
= target_lv
109 self
.data_device
= self
.db_device
= self
.wal_device
= None
110 for device
, type in devices
:
112 self
.data_device
= device
114 self
.db_device
= device
116 self
.wal_device
= device
117 if not self
.data_device
:
118 mlogger
.error('Data device not found')
120 "Unexpected error, terminating")
121 if not self
.data_device
.is_lv
:
122 mlogger
.error('Data device isn\'t LVM')
124 "Unexpected error, terminating")
126 self
.old_target_tags
= self
.target_lv
.tags
.copy()
127 self
.old_data_tags
= (
128 self
.data_device
.lv_api
.tags
.copy()
129 if self
.data_device
.is_lv
else None)
131 self
.db_device
.lv_api
.tags
.copy()
132 if self
.db_device
and self
.db_device
.is_lv
else None)
133 self
.old_wal_tags
= (
134 self
.wal_device
.lv_api
.tags
.copy()
135 if self
.wal_device
and self
.wal_device
.is_lv
else None)
137 def update_tags_when_lv_create(self
, create_type
):
139 if not self
.data_device
.is_lv
:
141 'Data device is not LVM, wouldn\'t update LVM tags')
143 tags
["ceph.{}_uuid".format(create_type
)] = self
.target_lv
.lv_uuid
144 tags
["ceph.{}_device".format(create_type
)] = self
.target_lv
.lv_path
145 self
.data_device
.lv_api
.set_tags(tags
)
147 tags
= self
.data_device
.lv_api
.tags
.copy()
148 tags
["ceph.type"] = create_type
149 self
.target_lv
.set_tags(tags
)
152 if create_type
== "db" and self
.wal_device
:
153 aux_dev
= self
.wal_device
154 elif create_type
== "wal" and self
.db_device
:
155 aux_dev
= self
.db_device
158 if not aux_dev
.is_lv
:
160 '{} device is not LVM, wouldn\'t update LVM tags'.format(
161 create_type
.upper()))
164 tags
["ceph.{}_uuid".format(create_type
)] = self
.target_lv
.lv_uuid
165 tags
["ceph.{}_device".format(create_type
)] = self
.target_lv
.lv_path
166 aux_dev
.lv_api
.set_tags(tags
)
168 def remove_lvs(self
, source_devices
, target_type
):
169 remaining_devices
= [self
.data_device
, self
.db_device
, self
.wal_device
]
172 for device
, type in source_devices
:
173 if type == "block" or type == target_type
:
175 remaining_devices
.remove(device
)
177 outdated_tags
.append("ceph.{}_uuid".format(type))
178 outdated_tags
.append("ceph.{}_device".format(type))
179 device
.lv_api
.clear_tags()
180 if len(outdated_tags
) > 0:
181 for d
in remaining_devices
:
183 d
.lv_api
.clear_tags(outdated_tags
)
185 def replace_lvs(self
, source_devices
, target_type
):
186 remaining_devices
= [self
.data_device
]
188 remaining_devices
.append(self
.db_device
)
190 remaining_devices
.append(self
.wal_device
)
193 for device
, type in source_devices
:
196 remaining_devices
.remove(device
)
198 outdated_tags
.append("ceph.{}_uuid".format(type))
199 outdated_tags
.append("ceph.{}_device".format(type))
200 device
.lv_api
.clear_tags()
203 new_tags
["ceph.{}_uuid".format(target_type
)] = self
.target_lv
.lv_uuid
204 new_tags
["ceph.{}_device".format(target_type
)] = self
.target_lv
.lv_path
206 for d
in remaining_devices
:
208 if len(outdated_tags
) > 0:
209 d
.lv_api
.clear_tags(outdated_tags
)
210 d
.lv_api
.set_tags(new_tags
)
212 if not self
.data_device
.is_lv
:
214 'Data device is not LVM, wouldn\'t properly update target LVM tags')
216 tags
= self
.data_device
.lv_api
.tags
.copy()
218 tags
["ceph.type"] = target_type
219 tags
["ceph.{}_uuid".format(target_type
)] = self
.target_lv
.lv_uuid
220 tags
["ceph.{}_device".format(target_type
)] = self
.target_lv
.lv_path
221 self
.target_lv
.set_tags(tags
)
225 'Undoing lv tag set')
227 if self
.old_data_tags
:
228 self
.data_device
.lv_api
.set_tags(self
.old_data_tags
)
230 self
.data_device
.lv_api
.clear_tags()
233 self
.db_device
.lv_api
.set_tags(self
.old_db_tags
)
235 self
.db_device
.lv_api
.clear_tags()
237 if self
.old_wal_tags
:
238 self
.wal_device
.lv_api
.set_tags(self
.old_wal_tags
)
240 self
.wal_device
.lv_api
.clear_tags()
241 if self
.old_target_tags
:
242 self
.target_lv
.set_tags(self
.old_target_tags
)
244 self
.target_lv
.clear_tags()
246 class Migrate(object):
248 help = 'Migrate BlueFS data from to another LVM device'
250 def __init__(self
, argv
):
254 def get_source_devices(self
, devices
, target_type
=""):
256 for device
, type in devices
:
257 if type == target_type
:
260 if 'data' not in self
.args
.from_
:
263 if 'db' not in self
.args
.from_
:
266 if 'wal' not in self
.args
.from_
:
268 ret
.append([device
, type])
270 mlogger
.error('Source device list is empty')
272 'Unable to migrate to : {}'.format(self
.args
.target
))
275 # ceph-bluestore-tool uses the following replacement rules
276 # (in the order of precedence, stop on the first match)
277 # if source list has DB volume - target device replaces it.
278 # if source list has WAL volume - target device replace it.
279 # if source list has slow volume only - operation isn't permitted,
280 # requires explicit allocation via new-db/new-wal command.detects which
281 def get_target_type_by_source(self
, devices
):
283 for device
, type in devices
:
290 def get_filename_by_type(self
, type):
292 if type == 'db' or type == 'wal':
293 filename
+= '.' + type
296 def get_source_args(self
, osd_path
, devices
):
298 for device
, type in devices
:
299 ret
= ret
+ ["--devs-source", os
.path
.join(
300 osd_path
, self
.get_filename_by_type(type))]
303 @decorators.needs_root
304 def migrate_to_new(self
, osd_id
, osd_fsid
, devices
, target_lv
):
305 source_devices
= self
.get_source_devices(devices
)
306 target_type
= self
.get_target_type_by_source(source_devices
)
309 "Unable to determine new volume type,"
310 " please use new-db or new-wal command before.")
312 "Unable to migrate to : {}".format(self
.args
.target
))
314 target_path
= target_lv
.lv_path
317 tag_tracker
= VolumeTagTracker(devices
, target_lv
)
318 # we need to update lvm tags for all the remaining volumes
319 # and clear for ones which to be removed
321 # ceph-bluestore-tool removes source volume(s) other than block one
322 # and attaches target one after successful migration
323 tag_tracker
.replace_lvs(source_devices
, target_type
)
325 osd_path
= get_osd_path(osd_id
, osd_fsid
)
326 source_args
= self
.get_source_args(osd_path
, source_devices
)
327 mlogger
.info("Migrate to new, Source: {} Target: {}".format(
328 source_args
, target_path
))
329 stdout
, stderr
, exit_code
= process
.call([
330 'ceph-bluestore-tool',
336 'bluefs-bdev-migrate'] +
340 'Failed to migrate device, error code:{}'.format(exit_code
))
342 'Failed to migrate to : {}'.format(self
.args
.target
))
344 system
.chown(os
.path
.join(osd_path
, "block.{}".format(
346 terminal
.success('Migration successful.')
353 @decorators.needs_root
354 def migrate_to_existing(self
, osd_id
, osd_fsid
, devices
, target_lv
):
355 target_type
= target_lv
.tags
["ceph.type"]
356 if target_type
== "wal":
357 mlogger
.error("Migrate to WAL is not supported")
359 "Unable to migrate to : {}".format(self
.args
.target
))
360 target_filename
= self
.get_filename_by_type(target_type
)
361 if (target_filename
== ""):
363 "Target Logical Volume doesn't have proper volume type "
364 "(ceph.type LVM tag): {}".format(target_type
))
366 "Unable to migrate to : {}".format(self
.args
.target
))
368 osd_path
= get_osd_path(osd_id
, osd_fsid
)
369 source_devices
= self
.get_source_devices(devices
, target_type
)
370 target_path
= os
.path
.join(osd_path
, target_filename
)
371 tag_tracker
= VolumeTagTracker(devices
, target_lv
)
374 # ceph-bluestore-tool removes source volume(s) other than
375 # block and target ones after successful migration
376 tag_tracker
.remove_lvs(source_devices
, target_type
)
377 source_args
= self
.get_source_args(osd_path
, source_devices
)
378 mlogger
.info("Migrate to existing, Source: {} Target: {}".format(
379 source_args
, target_path
))
380 stdout
, stderr
, exit_code
= process
.call([
381 'ceph-bluestore-tool',
387 'bluefs-bdev-migrate'] +
391 'Failed to migrate device, error code:{}'.format(exit_code
))
393 'Failed to migrate to : {}'.format(self
.args
.target
))
395 terminal
.success('Migration successful.')
402 @decorators.needs_root
403 def migrate_osd(self
):
404 if self
.args
.osd_id
and not self
.args
.no_systemd
:
405 osd_is_running
= systemctl
.osd_is_active(self
.args
.osd_id
)
407 mlogger
.error('OSD is running, stop it with: '
408 'systemctl stop ceph-osd@{}'.format(
411 'Unable to migrate devices associated with OSD ID: {}'
412 .format(self
.args
.osd_id
))
414 target_lv
= api
.get_lv_by_fullname(self
.args
.target
)
417 'Target path "{}" is not a Logical Volume'.format(
420 'Unable to migrate to : {}'.format(self
.args
.target
))
421 devices
= find_associated_devices(self
.args
.osd_id
, self
.args
.osd_fsid
)
422 if (not target_lv
.used_by_ceph
):
423 self
.migrate_to_new(self
.args
.osd_id
, self
.args
.osd_fsid
,
427 if (target_lv
.tags
['ceph.osd_id'] != self
.args
.osd_id
or
428 target_lv
.tags
['ceph.osd_fsid'] != self
.args
.osd_fsid
):
430 'Target Logical Volume isn\'t used by the specified OSD: '
431 '{} FSID: {}'.format(self
.args
.osd_id
,
434 'Unable to migrate to : {}'.format(self
.args
.target
))
436 self
.migrate_to_existing(self
.args
.osd_id
, self
.args
.osd_fsid
,
440 def make_parser(self
, prog
, sub_command_help
):
441 parser
= argparse
.ArgumentParser(
443 formatter_class
=argparse
.RawDescriptionHelpFormatter
,
444 description
=sub_command_help
,
450 help='Specify an OSD ID to detect associated devices for zapping',
457 help='Specify an OSD FSID to detect associated devices for zapping',
462 help='Specify target Logical Volume (LV) to migrate data to',
469 choices
=['data', 'db', 'wal'],
470 help='Copy BlueFS data from DB device',
476 help='Skip checking OSD systemd unit',
481 sub_command_help
= dedent("""
482 Moves BlueFS data from source volume(s) to the target one, source
483 volumes (except the main (i.e. data or block) one) are removed on
484 success. LVM volumes are permitted for Target only, both already
485 attached or new logical one. In the latter case it is attached to OSD
486 replacing one of the source devices. Following replacement rules apply
487 (in the order of precedence, stop on the first match):
488 * if source list has DB volume - target device replaces it.
489 * if source list has WAL volume - target device replace it.
490 * if source list has slow volume only - operation is not permitted,
491 requires explicit allocation via new-db/new-wal command.
493 Example calls for supported scenarios:
495 Moves BlueFS data from main device to LV already attached as DB:
497 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data --target vgname/db
499 Moves BlueFS data from shared main device to LV which will be attached
502 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data --target vgname/new_db
504 Moves BlueFS data from DB device to new LV, DB is replaced:
506 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from db --target vgname/new_db
508 Moves BlueFS data from main and DB devices to new LV, DB is replaced:
510 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data db --target vgname/new_db
512 Moves BlueFS data from main, DB and WAL devices to new LV, WAL is
513 removed and DB is replaced:
515 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data db wal --target vgname/new_db
517 Moves BlueFS data from main, DB and WAL devices to main device, WAL
520 ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from db wal --target vgname/data
524 parser
= self
.make_parser('ceph-volume lvm migrate', sub_command_help
)
526 if len(self
.argv
) == 0:
527 print(sub_command_help
)
530 self
.args
= parser
.parse_args(self
.argv
)
534 class NewVolume(object):
535 def __init__(self
, create_type
, argv
):
536 self
.create_type
= create_type
539 def make_parser(self
, prog
, sub_command_help
):
540 parser
= argparse
.ArgumentParser(
542 formatter_class
=argparse
.RawDescriptionHelpFormatter
,
543 description
=sub_command_help
,
549 help='Specify an OSD ID to attach new volume to',
556 help='Specify an OSD FSIDto attach new volume to',
561 help='Specify target Logical Volume (LV) to attach',
567 help='Skip checking OSD systemd unit',
571 @decorators.needs_root
572 def make_new_volume(self
, osd_id
, osd_fsid
, devices
, target_lv
):
573 osd_path
= get_osd_path(osd_id
, osd_fsid
)
575 'Making new volume at {} for OSD: {} ({})'.format(
576 target_lv
.lv_path
, osd_id
, osd_path
))
577 tag_tracker
= VolumeTagTracker(devices
, target_lv
)
580 tag_tracker
.update_tags_when_lv_create(self
.create_type
)
582 stdout
, stderr
, exit_code
= process
.call([
583 'ceph-bluestore-tool',
589 'bluefs-bdev-new-{}'.format(self
.create_type
)
593 'failed to attach new volume, error code:{}'.format(
596 "Failed to attach new volume: {}".format(
599 system
.chown(os
.path
.join(osd_path
, "block.{}".format(
601 terminal
.success('New volume attached.')
607 @decorators.needs_root
608 def new_volume(self
):
609 if self
.args
.osd_id
and not self
.args
.no_systemd
:
610 osd_is_running
= systemctl
.osd_is_active(self
.args
.osd_id
)
612 mlogger
.error('OSD ID is running, stop it with:'
613 ' systemctl stop ceph-osd@{}'.format(self
.args
.osd_id
))
615 'Unable to attach new volume for OSD: {}'.format(
618 target_lv
= api
.get_lv_by_fullname(self
.args
.target
)
621 'Target path {} is not a Logical Volume'.format(
624 'Unable to attach new volume : {}'.format(self
.args
.target
))
625 if target_lv
.used_by_ceph
:
627 'Target Logical Volume is already used by ceph: {}'.format(
630 'Unable to attach new volume : {}'.format(self
.args
.target
))
632 devices
= find_associated_devices(self
.args
.osd_id
,
634 self
.make_new_volume(
640 class NewWAL(NewVolume
):
642 help = 'Allocate new WAL volume for OSD at specified Logical Volume'
644 def __init__(self
, argv
):
645 super(NewWAL
, self
).__init
__("wal", argv
)
648 sub_command_help
= dedent("""
649 Attaches the given logical volume to the given OSD as a WAL volume.
650 Logical volume format is vg/lv. Fails if OSD has already got attached DB.
654 Attach vgname/lvname as a WAL volume to OSD 1
656 ceph-volume lvm new-wal --osd-id 1 --osd-fsid 55BD4219-16A7-4037-BC20-0F158EFCC83D --target vgname/new_wal
658 parser
= self
.make_parser('ceph-volume lvm new-wal', sub_command_help
)
660 if len(self
.argv
) == 0:
661 print(sub_command_help
)
664 self
.args
= parser
.parse_args(self
.argv
)
668 class NewDB(NewVolume
):
670 help = 'Allocate new DB volume for OSD at specified Logical Volume'
672 def __init__(self
, argv
):
673 super(NewDB
, self
).__init
__("db", argv
)
676 sub_command_help
= dedent("""
677 Attaches the given logical volume to the given OSD as a DB volume.
678 Logical volume format is vg/lv. Fails if OSD has already got attached DB.
682 Attach vgname/lvname as a DB volume to OSD 1
684 ceph-volume lvm new-db --osd-id 1 --osd-fsid 55BD4219-16A7-4037-BC20-0F158EFCC83D --target vgname/new_db
687 parser
= self
.make_parser('ceph-volume lvm new-db', sub_command_help
)
688 if len(self
.argv
) == 0:
689 print(sub_command_help
)
691 self
.args
= parser
.parse_args(self
.argv
)