]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import datetime |
2 | import json | |
3 | import logging | |
4 | import time | |
5 | ||
6 | from typing import List, Dict, Any, Set, Union | |
7 | ||
8 | import orchestrator | |
9 | from orchestrator import OrchestratorError | |
10 | ||
11 | logger = logging.getLogger(__name__) | |
12 | ||
13 | ||
14 | class OSDRemoval(object): | |
15 | def __init__(self, | |
16 | osd_id: str, | |
17 | replace: bool, | |
18 | force: bool, | |
19 | nodename: str, | |
20 | fullname: str, | |
21 | start_at: datetime.datetime, | |
22 | pg_count: int): | |
23 | self.osd_id = osd_id | |
24 | self.replace = replace | |
25 | self.force = force | |
26 | self.nodename = nodename | |
27 | self.fullname = fullname | |
28 | self.started_at = start_at | |
29 | self.pg_count = pg_count | |
30 | ||
31 | # needed due to changing 'started_at' attr | |
32 | def __eq__(self, other): | |
33 | return self.osd_id == other.osd_id | |
34 | ||
35 | def __hash__(self): | |
36 | return hash(self.osd_id) | |
37 | ||
38 | def __repr__(self): | |
39 | return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}' | |
40 | ', fullname={}, started_at={}, pg_count={})').format( | |
41 | self.osd_id, self.replace, self.force, self.nodename, | |
42 | self.fullname, self.started_at, self.pg_count) | |
43 | ||
44 | @property | |
45 | def pg_count_str(self) -> str: | |
46 | return 'n/a' if self.pg_count < 0 else str(self.pg_count) | |
47 | ||
48 | ||
49 | class RemoveUtil(object): | |
50 | def __init__(self, mgr): | |
51 | self.mgr = mgr | |
52 | self.to_remove_osds: Set[OSDRemoval] = set() | |
53 | self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict() | |
54 | ||
55 | @property | |
56 | def report(self) -> Set[OSDRemoval]: | |
57 | return self.to_remove_osds.copy() | |
58 | ||
59 | def queue_osds_for_removal(self, osds: Set[OSDRemoval]): | |
60 | self.to_remove_osds.update(osds) | |
61 | ||
62 | def _remove_osds_bg(self) -> None: | |
63 | """ | |
64 | Performs actions in the _serve() loop to remove an OSD | |
65 | when criteria is met. | |
66 | """ | |
67 | logger.debug( | |
68 | f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}") | |
69 | self._update_osd_removal_status() | |
70 | remove_osds: set = self.to_remove_osds.copy() | |
71 | for osd in remove_osds: | |
72 | if not osd.force: | |
73 | self.drain_osd(osd.osd_id) | |
74 | # skip criteria | |
75 | if not self.is_empty(osd.osd_id): | |
76 | logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") | |
77 | continue | |
78 | ||
79 | if not self.ok_to_destroy([osd.osd_id]): | |
80 | logger.info( | |
81 | f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") | |
82 | continue | |
83 | ||
84 | # abort criteria | |
85 | if not self.down_osd([osd.osd_id]): | |
86 | # also remove it from the remove_osd list and set a health_check warning? | |
87 | raise orchestrator.OrchestratorError( | |
88 | f"Could not set OSD <{osd.osd_id}> to 'down'") | |
89 | ||
90 | if osd.replace: | |
91 | if not self.destroy_osd(osd.osd_id): | |
92 | # also remove it from the remove_osd list and set a health_check warning? | |
93 | raise orchestrator.OrchestratorError( | |
94 | f"Could not destroy OSD <{osd.osd_id}>") | |
95 | else: | |
96 | if not self.purge_osd(osd.osd_id): | |
97 | # also remove it from the remove_osd list and set a health_check warning? | |
98 | raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") | |
99 | ||
100 | self.mgr._remove_daemon(osd.fullname, osd.nodename) | |
101 | logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}") | |
102 | logger.debug(f"Removing {osd.osd_id} from the queue.") | |
103 | self.to_remove_osds.remove(osd) | |
104 | ||
105 | def _update_osd_removal_status(self): | |
106 | """ | |
107 | Generate a OSD report that can be printed to the CLI | |
108 | """ | |
109 | logger.debug("Update OSD removal status") | |
110 | for osd in self.to_remove_osds: | |
111 | osd.pg_count = self.get_pg_count(str(osd.osd_id)) | |
112 | logger.debug(f"OSD removal status: {self.to_remove_osds}") | |
113 | ||
114 | def drain_osd(self, osd_id: str) -> bool: | |
115 | """ | |
116 | Uses `osd_support` module to schedule a drain operation of an OSD | |
117 | """ | |
118 | cmd_args = { | |
119 | 'prefix': 'osd drain', | |
120 | 'osd_ids': [int(osd_id)] | |
121 | } | |
122 | return self._run_mon_cmd(cmd_args) | |
123 | ||
124 | def get_pg_count(self, osd_id: str) -> int: | |
125 | """ Queries for PG count of an OSD """ | |
126 | self.mgr.log.debug("Querying for drain status") | |
127 | ret, out, err = self.mgr.mon_command({ | |
128 | 'prefix': 'osd drain status', | |
129 | }) | |
130 | if ret != 0: | |
131 | self.mgr.log.error(f"Calling osd drain status failed with {err}") | |
132 | raise OrchestratorError("Could not query `osd drain status`") | |
133 | out = json.loads(out) | |
134 | for o in out: | |
135 | if str(o.get('osd_id', '')) == str(osd_id): | |
136 | return int(o.get('pgs', -1)) | |
137 | return -1 | |
138 | ||
139 | def is_empty(self, osd_id: str) -> bool: | |
140 | """ Checks if an OSD is empty """ | |
141 | return self.get_pg_count(osd_id) == 0 | |
142 | ||
143 | def ok_to_destroy(self, osd_ids: List[int]) -> bool: | |
144 | """ Queries the safe-to-destroy flag for OSDs """ | |
145 | cmd_args = {'prefix': 'osd safe-to-destroy', | |
146 | 'ids': osd_ids} | |
147 | return self._run_mon_cmd(cmd_args) | |
148 | ||
149 | def destroy_osd(self, osd_id: int) -> bool: | |
150 | """ Destroys an OSD (forcefully) """ | |
151 | cmd_args = {'prefix': 'osd destroy-actual', | |
152 | 'id': int(osd_id), | |
153 | 'yes_i_really_mean_it': True} | |
154 | return self._run_mon_cmd(cmd_args) | |
155 | ||
156 | def down_osd(self, osd_ids: List[int]) -> bool: | |
157 | """ Sets `out` flag to OSDs """ | |
158 | cmd_args = { | |
159 | 'prefix': 'osd down', | |
160 | 'ids': osd_ids, | |
161 | } | |
162 | return self._run_mon_cmd(cmd_args) | |
163 | ||
164 | def purge_osd(self, osd_id: int) -> bool: | |
165 | """ Purges an OSD from the cluster (forcefully) """ | |
166 | cmd_args = { | |
167 | 'prefix': 'osd purge-actual', | |
168 | 'id': int(osd_id), | |
169 | 'yes_i_really_mean_it': True | |
170 | } | |
171 | return self._run_mon_cmd(cmd_args) | |
172 | ||
173 | def out_osd(self, osd_ids: List[int]) -> bool: | |
174 | """ Sets `down` flag to OSDs """ | |
175 | cmd_args = { | |
176 | 'prefix': 'osd out', | |
177 | 'ids': osd_ids, | |
178 | } | |
179 | return self._run_mon_cmd(cmd_args) | |
180 | ||
181 | def _run_mon_cmd(self, cmd_args: dict) -> bool: | |
182 | """ | |
183 | Generic command to run mon_command and evaluate/log the results | |
184 | """ | |
185 | ret, out, err = self.mgr.mon_command(cmd_args) | |
186 | if ret != 0: | |
187 | self.mgr.log.debug(f"ran {cmd_args} with mon_command") | |
188 | self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") | |
189 | return False | |
190 | self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") | |
191 | return True |