]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | import os |
2 | import json | |
3 | import errno | |
4 | import logging | |
b3b6e05e | 5 | import random |
f67539c2 TL |
6 | import time |
7 | ||
8 | from io import StringIO | |
b3b6e05e | 9 | from collections import deque |
f67539c2 TL |
10 | |
11 | from tasks.cephfs.cephfs_test_case import CephFSTestCase | |
12 | from teuthology.exceptions import CommandFailedError | |
13 | from teuthology.contextutil import safe_while | |
14 | ||
15 | log = logging.getLogger(__name__) | |
16 | ||
17 | class TestMirroring(CephFSTestCase): | |
18 | MDSS_REQUIRED = 5 | |
19 | CLIENTS_REQUIRED = 2 | |
20 | REQUIRE_BACKUP_FILESYSTEM = True | |
21 | ||
22 | MODULE_NAME = "mirroring" | |
23 | ||
24 | def setUp(self): | |
25 | super(TestMirroring, self).setUp() | |
26 | self.primary_fs_name = self.fs.name | |
27 | self.primary_fs_id = self.fs.id | |
28 | self.secondary_fs_name = self.backup_fs.name | |
b3b6e05e | 29 | self.secondary_fs_id = self.backup_fs.id |
f67539c2 TL |
30 | self.enable_mirroring_module() |
31 | ||
32 | def tearDown(self): | |
33 | self.disable_mirroring_module() | |
34 | super(TestMirroring, self).tearDown() | |
35 | ||
36 | def enable_mirroring_module(self): | |
37 | self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable", TestMirroring.MODULE_NAME) | |
38 | ||
39 | def disable_mirroring_module(self): | |
40 | self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "disable", TestMirroring.MODULE_NAME) | |
41 | ||
42 | def enable_mirroring(self, fs_name, fs_id): | |
43 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "enable", fs_name) | |
44 | time.sleep(10) | |
45 | # verify via asok | |
46 | res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', | |
47 | 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') | |
48 | self.assertTrue(res['peers'] == {}) | |
49 | self.assertTrue(res['snap_dirs']['dir_count'] == 0) | |
50 | ||
51 | def disable_mirroring(self, fs_name, fs_id): | |
52 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "disable", fs_name) | |
53 | time.sleep(10) | |
54 | # verify via asok | |
55 | try: | |
56 | self.mirror_daemon_command(f'mirror status for fs: {fs_name}', | |
57 | 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') | |
58 | except CommandFailedError: | |
59 | pass | |
60 | else: | |
61 | raise RuntimeError('expected admin socket to be unavailable') | |
62 | ||
63 | def verify_peer_added(self, fs_name, fs_id, peer_spec, remote_fs_name=None): | |
64 | # verify via asok | |
65 | res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', | |
66 | 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') | |
67 | peer_uuid = self.get_peer_uuid(peer_spec) | |
68 | self.assertTrue(peer_uuid in res['peers']) | |
69 | client_name = res['peers'][peer_uuid]['remote']['client_name'] | |
70 | cluster_name = res['peers'][peer_uuid]['remote']['cluster_name'] | |
71 | self.assertTrue(peer_spec == f'{client_name}@{cluster_name}') | |
72 | if remote_fs_name: | |
73 | self.assertTrue(self.secondary_fs_name == res['peers'][peer_uuid]['remote']['fs_name']) | |
74 | else: | |
75 | self.assertTrue(self.fs_name == res['peers'][peer_uuid]['remote']['fs_name']) | |
76 | ||
77 | def peer_add(self, fs_name, fs_id, peer_spec, remote_fs_name=None): | |
78 | if remote_fs_name: | |
79 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_add", fs_name, peer_spec, remote_fs_name) | |
80 | else: | |
81 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_add", fs_name, peer_spec) | |
82 | time.sleep(10) | |
83 | self.verify_peer_added(fs_name, fs_id, peer_spec, remote_fs_name) | |
84 | ||
85 | def peer_remove(self, fs_name, fs_id, peer_spec): | |
86 | peer_uuid = self.get_peer_uuid(peer_spec) | |
87 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_remove", fs_name, peer_uuid) | |
88 | time.sleep(10) | |
89 | # verify via asok | |
90 | res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', | |
91 | 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') | |
92 | self.assertTrue(res['peers'] == {} and res['snap_dirs']['dir_count'] == 0) | |
93 | ||
94 | def bootstrap_peer(self, fs_name, client_name, site_name): | |
95 | outj = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd( | |
96 | "fs", "snapshot", "mirror", "peer_bootstrap", "create", fs_name, client_name, site_name)) | |
97 | return outj['token'] | |
98 | ||
99 | def import_peer(self, fs_name, token): | |
100 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_bootstrap", "import", | |
101 | fs_name, token) | |
102 | ||
103 | def add_directory(self, fs_name, fs_id, dir_name): | |
104 | # get initial dir count | |
105 | res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', | |
106 | 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') | |
107 | dir_count = res['snap_dirs']['dir_count'] | |
108 | log.debug(f'initial dir_count={dir_count}') | |
109 | ||
110 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", fs_name, dir_name) | |
111 | ||
112 | time.sleep(10) | |
113 | # verify via asok | |
114 | res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', | |
115 | 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') | |
116 | new_dir_count = res['snap_dirs']['dir_count'] | |
117 | log.debug(f'new dir_count={new_dir_count}') | |
118 | self.assertTrue(new_dir_count > dir_count) | |
119 | ||
120 | def remove_directory(self, fs_name, fs_id, dir_name): | |
121 | # get initial dir count | |
122 | res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', | |
123 | 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') | |
124 | dir_count = res['snap_dirs']['dir_count'] | |
125 | log.debug(f'initial dir_count={dir_count}') | |
126 | ||
127 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "remove", fs_name, dir_name) | |
128 | ||
129 | time.sleep(10) | |
130 | # verify via asok | |
131 | res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', | |
132 | 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') | |
133 | new_dir_count = res['snap_dirs']['dir_count'] | |
134 | log.debug(f'new dir_count={new_dir_count}') | |
135 | self.assertTrue(new_dir_count < dir_count) | |
136 | ||
137 | def check_peer_status(self, fs_name, fs_id, peer_spec, dir_name, expected_snap_name, | |
138 | expected_snap_count): | |
139 | peer_uuid = self.get_peer_uuid(peer_spec) | |
140 | res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', | |
141 | 'fs', 'mirror', 'peer', 'status', | |
142 | f'{fs_name}@{fs_id}', peer_uuid) | |
143 | self.assertTrue(dir_name in res) | |
144 | self.assertTrue(res[dir_name]['last_synced_snap']['name'] == expected_snap_name) | |
145 | self.assertTrue(res[dir_name]['snaps_synced'] == expected_snap_count) | |
146 | ||
147 | def check_peer_status_deleted_snap(self, fs_name, fs_id, peer_spec, dir_name, | |
148 | expected_delete_count): | |
149 | peer_uuid = self.get_peer_uuid(peer_spec) | |
150 | res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', | |
151 | 'fs', 'mirror', 'peer', 'status', | |
152 | f'{fs_name}@{fs_id}', peer_uuid) | |
153 | self.assertTrue(dir_name in res) | |
154 | self.assertTrue(res[dir_name]['snaps_deleted'] == expected_delete_count) | |
155 | ||
156 | def check_peer_status_renamed_snap(self, fs_name, fs_id, peer_spec, dir_name, | |
157 | expected_rename_count): | |
158 | peer_uuid = self.get_peer_uuid(peer_spec) | |
159 | res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', | |
160 | 'fs', 'mirror', 'peer', 'status', | |
161 | f'{fs_name}@{fs_id}', peer_uuid) | |
162 | self.assertTrue(dir_name in res) | |
163 | self.assertTrue(res[dir_name]['snaps_renamed'] == expected_rename_count) | |
164 | ||
165 | def check_peer_snap_in_progress(self, fs_name, fs_id, | |
166 | peer_spec, dir_name, snap_name): | |
167 | peer_uuid = self.get_peer_uuid(peer_spec) | |
168 | res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', | |
169 | 'fs', 'mirror', 'peer', 'status', | |
170 | f'{fs_name}@{fs_id}', peer_uuid) | |
171 | self.assertTrue('syncing' == res[dir_name]['state']) | |
172 | self.assertTrue(res[dir_name]['current_sycning_snap']['name'] == snap_name) | |
173 | ||
174 | def verify_snapshot(self, dir_name, snap_name): | |
175 | snap_list = self.mount_b.ls(path=f'{dir_name}/.snap') | |
176 | self.assertTrue(snap_name in snap_list) | |
177 | ||
178 | source_res = self.mount_a.dir_checksum(path=f'{dir_name}/.snap/{snap_name}', | |
179 | follow_symlinks=True) | |
180 | log.debug(f'source snapshot checksum {snap_name} {source_res}') | |
181 | ||
182 | dest_res = self.mount_b.dir_checksum(path=f'{dir_name}/.snap/{snap_name}', | |
183 | follow_symlinks=True) | |
184 | log.debug(f'destination snapshot checksum {snap_name} {dest_res}') | |
185 | self.assertTrue(source_res == dest_res) | |
186 | ||
187 | def verify_failed_directory(self, fs_name, fs_id, peer_spec, dir_name): | |
188 | peer_uuid = self.get_peer_uuid(peer_spec) | |
189 | res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', | |
190 | 'fs', 'mirror', 'peer', 'status', | |
191 | f'{fs_name}@{fs_id}', peer_uuid) | |
192 | self.assertTrue('failed' == res[dir_name]['state']) | |
193 | ||
194 | def get_peer_uuid(self, peer_spec): | |
195 | status = self.fs.status() | |
196 | fs_map = status.get_fsmap_byname(self.primary_fs_name) | |
197 | peers = fs_map['mirror_info']['peers'] | |
198 | for peer_uuid, mirror_info in peers.items(): | |
199 | client_name = mirror_info['remote']['client_name'] | |
200 | cluster_name = mirror_info['remote']['cluster_name'] | |
201 | remote_peer_spec = f'{client_name}@{cluster_name}' | |
202 | if peer_spec == remote_peer_spec: | |
203 | return peer_uuid | |
204 | return None | |
205 | ||
206 | def get_daemon_admin_socket(self): | |
207 | """overloaded by teuthology override (fs/mirror/clients/mirror.yaml)""" | |
208 | return "/var/run/ceph/cephfs-mirror.asok" | |
209 | ||
210 | def get_mirror_daemon_pid(self): | |
211 | """pid file overloaded in fs/mirror/clients/mirror.yaml""" | |
212 | return self.mount_a.run_shell(['cat', '/var/run/ceph/cephfs-mirror.pid']).stdout.getvalue().strip() | |
213 | ||
214 | def get_mirror_rados_addr(self, fs_name, fs_id): | |
215 | """return the rados addr used by cephfs-mirror instance""" | |
216 | res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', | |
217 | 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') | |
218 | return res['rados_inst'] | |
219 | ||
220 | def get_blocklisted_instances(self): | |
221 | return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd( | |
222 | "osd", "dump", "--format=json-pretty"))['blocklist'] | |
223 | ||
224 | def mirror_daemon_command(self, cmd_label, *args): | |
225 | asok_path = self.get_daemon_admin_socket() | |
226 | try: | |
227 | # use mount_a's remote to execute command | |
228 | p = self.mount_a.client_remote.run(args= | |
229 | ['ceph', '--admin-daemon', asok_path] + list(args), | |
230 | stdout=StringIO(), stderr=StringIO(), timeout=30, | |
231 | check_status=True, label=cmd_label) | |
232 | p.wait() | |
233 | except CommandFailedError as ce: | |
234 | log.warn(f'mirror daemon command with label "{cmd_label}" failed: {ce}') | |
235 | raise | |
236 | res = p.stdout.getvalue().strip() | |
237 | log.debug(f'command returned={res}') | |
238 | return json.loads(res) | |
239 | ||
b3b6e05e | 240 | def get_mirror_daemon_status(self, fs_name, fs_id): |
f67539c2 TL |
241 | daemon_status = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "daemon", "status", fs_name)) |
242 | log.debug(f'daemon_status: {daemon_status}') | |
b3b6e05e TL |
243 | # running a single mirror daemon is supported |
244 | status = daemon_status[0] | |
f67539c2 TL |
245 | log.debug(f'status: {status}') |
246 | return status | |
247 | ||
248 | def test_basic_mirror_commands(self): | |
249 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
250 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
251 | ||
252 | def test_mirror_peer_commands(self): | |
253 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
254 | ||
255 | # add peer | |
256 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
257 | # remove peer | |
258 | self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph") | |
259 | ||
260 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
261 | ||
262 | def test_mirror_disable_with_peer(self): | |
263 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
264 | ||
265 | # add peer | |
266 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
267 | ||
268 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
269 | ||
270 | def test_matching_peer(self): | |
271 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
272 | ||
273 | try: | |
274 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph") | |
275 | except CommandFailedError as ce: | |
276 | if ce.exitstatus != errno.EINVAL: | |
277 | raise RuntimeError('invalid errno when adding a matching remote peer') | |
278 | else: | |
279 | raise RuntimeError('adding a peer matching local spec should fail') | |
280 | ||
281 | # verify via asok -- nothing should get added | |
282 | res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', | |
283 | 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') | |
284 | self.assertTrue(res['peers'] == {}) | |
285 | ||
286 | # and explicitly specifying the spec (via filesystem name) should fail too | |
287 | try: | |
288 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name) | |
289 | except CommandFailedError as ce: | |
290 | if ce.exitstatus != errno.EINVAL: | |
291 | raise RuntimeError('invalid errno when adding a matching remote peer') | |
292 | else: | |
293 | raise RuntimeError('adding a peer matching local spec should fail') | |
294 | ||
295 | # verify via asok -- nothing should get added | |
296 | res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', | |
297 | 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') | |
298 | self.assertTrue(res['peers'] == {}) | |
299 | ||
300 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
301 | ||
302 | def test_mirror_peer_add_existing(self): | |
303 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
304 | ||
305 | # add peer | |
306 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
307 | ||
308 | # adding the same peer should be idempotent | |
309 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
310 | ||
311 | # remove peer | |
312 | self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph") | |
313 | ||
314 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
315 | ||
316 | def test_peer_commands_with_mirroring_disabled(self): | |
317 | # try adding peer when mirroring is not enabled | |
318 | try: | |
319 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
320 | except CommandFailedError as ce: | |
321 | if ce.exitstatus != errno.EINVAL: | |
322 | raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a peer') | |
323 | else: | |
324 | raise RuntimeError(-errno.EINVAL, 'expected peer_add to fail') | |
325 | ||
326 | # try removing peer | |
327 | try: | |
328 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_remove", self.primary_fs_name, 'dummy-uuid') | |
329 | except CommandFailedError as ce: | |
330 | if ce.exitstatus != errno.EINVAL: | |
331 | raise RuntimeError(-errno.EINVAL, 'incorrect error code when removing a peer') | |
332 | else: | |
333 | raise RuntimeError(-errno.EINVAL, 'expected peer_remove to fail') | |
334 | ||
335 | def test_add_directory_with_mirroring_disabled(self): | |
336 | # try adding a directory when mirroring is not enabled | |
337 | try: | |
338 | self.add_directory(self.primary_fs_name, self.primary_fs_id, "/d1") | |
339 | except CommandFailedError as ce: | |
340 | if ce.exitstatus != errno.EINVAL: | |
341 | raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a directory') | |
342 | else: | |
343 | raise RuntimeError(-errno.EINVAL, 'expected directory add to fail') | |
344 | ||
345 | def test_directory_commands(self): | |
346 | self.mount_a.run_shell(["mkdir", "d1"]) | |
347 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
348 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1') | |
349 | try: | |
350 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1') | |
351 | except CommandFailedError as ce: | |
352 | if ce.exitstatus != errno.EEXIST: | |
353 | raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-adding a directory') | |
354 | else: | |
355 | raise RuntimeError(-errno.EINVAL, 'expected directory add to fail') | |
356 | self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d1') | |
357 | try: | |
358 | self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d1') | |
359 | except CommandFailedError as ce: | |
360 | if ce.exitstatus not in (errno.ENOENT, errno.EINVAL): | |
361 | raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-deleting a directory') | |
362 | else: | |
363 | raise RuntimeError(-errno.EINVAL, 'expected directory removal to fail') | |
364 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
365 | self.mount_a.run_shell(["rmdir", "d1"]) | |
366 | ||
367 | def test_add_relative_directory_path(self): | |
368 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
369 | try: | |
370 | self.add_directory(self.primary_fs_name, self.primary_fs_id, './d1') | |
371 | except CommandFailedError as ce: | |
372 | if ce.exitstatus != errno.EINVAL: | |
373 | raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a relative path dir') | |
374 | else: | |
375 | raise RuntimeError(-errno.EINVAL, 'expected directory add to fail') | |
376 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
377 | ||
378 | def test_add_directory_path_normalization(self): | |
379 | self.mount_a.run_shell(["mkdir", "-p", "d1/d2/d3"]) | |
380 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
381 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d2/d3') | |
382 | def check_add_command_failure(dir_path): | |
383 | try: | |
384 | self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path) | |
385 | except CommandFailedError as ce: | |
386 | if ce.exitstatus != errno.EEXIST: | |
387 | raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-adding a directory') | |
388 | else: | |
389 | raise RuntimeError(-errno.EINVAL, 'expected directory add to fail') | |
390 | ||
391 | # everything points for /d1/d2/d3 | |
392 | check_add_command_failure('/d1/d2/././././././d3') | |
393 | check_add_command_failure('/d1/d2/././././././d3//////') | |
394 | check_add_command_failure('/d1/d2/../d2/././././d3') | |
395 | check_add_command_failure('/././././d1/./././d2/./././d3//////') | |
396 | check_add_command_failure('/./d1/./d2/./d3/../../../d1/d2/d3') | |
397 | ||
398 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
399 | self.mount_a.run_shell(["rm", "-rf", "d1"]) | |
400 | ||
401 | def test_add_ancestor_and_child_directory(self): | |
402 | self.mount_a.run_shell(["mkdir", "-p", "d1/d2/d3"]) | |
403 | self.mount_a.run_shell(["mkdir", "-p", "d1/d4"]) | |
404 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
405 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d2/') | |
406 | def check_add_command_failure(dir_path): | |
407 | try: | |
408 | self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path) | |
409 | except CommandFailedError as ce: | |
410 | if ce.exitstatus != errno.EINVAL: | |
411 | raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a directory') | |
412 | else: | |
413 | raise RuntimeError(-errno.EINVAL, 'expected directory add to fail') | |
414 | ||
415 | # cannot add ancestors or a subtree for an existing directory | |
416 | check_add_command_failure('/') | |
417 | check_add_command_failure('/d1') | |
418 | check_add_command_failure('/d1/d2/d3') | |
419 | ||
420 | # obviously, one can add a non-ancestor or non-subtree | |
421 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d4/') | |
422 | ||
423 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
424 | self.mount_a.run_shell(["rm", "-rf", "d1"]) | |
425 | ||
426 | def test_cephfs_mirror_blocklist(self): | |
427 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
428 | ||
429 | # add peer | |
430 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
431 | ||
432 | res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', | |
433 | 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') | |
434 | peers_1 = set(res['peers']) | |
435 | ||
436 | # fetch rados address for blacklist check | |
437 | rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id) | |
438 | ||
439 | # simulate non-responding mirror daemon by sending SIGSTOP | |
440 | pid = self.get_mirror_daemon_pid() | |
441 | log.debug(f'SIGSTOP to cephfs-mirror pid {pid}') | |
442 | self.mount_a.run_shell(['kill', '-SIGSTOP', pid]) | |
443 | ||
444 | # wait for blocklist timeout -- the manager module would blocklist | |
445 | # the mirror daemon | |
446 | time.sleep(40) | |
447 | ||
448 | # wake up the mirror daemon -- at this point, the daemon should know | |
449 | # that it has been blocklisted | |
450 | log.debug('SIGCONT to cephfs-mirror') | |
451 | self.mount_a.run_shell(['kill', '-SIGCONT', pid]) | |
452 | ||
453 | # check if the rados addr is blocklisted | |
454 | blocklist = self.get_blocklisted_instances() | |
455 | self.assertTrue(rados_inst in blocklist) | |
456 | ||
457 | # wait enough so that the mirror daemon restarts blocklisted instances | |
458 | time.sleep(40) | |
459 | rados_inst_new = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id) | |
460 | ||
461 | # and we should get a new rados instance | |
462 | self.assertTrue(rados_inst != rados_inst_new) | |
463 | ||
464 | # along with peers that were added | |
465 | res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', | |
466 | 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') | |
467 | peers_2 = set(res['peers']) | |
468 | self.assertTrue(peers_1, peers_2) | |
469 | ||
470 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
471 | ||
472 | def test_cephfs_mirror_stats(self): | |
473 | log.debug('reconfigure client auth caps') | |
474 | self.mds_cluster.mon_manager.raw_cluster_cmd_result( | |
475 | 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), | |
476 | 'mds', 'allow rw', | |
477 | 'mon', 'allow r', | |
478 | 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( | |
479 | self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) | |
480 | ||
481 | log.debug(f'mounting filesystem {self.secondary_fs_name}') | |
482 | self.mount_b.umount_wait() | |
483 | self.mount_b.mount(cephfs_name=self.secondary_fs_name) | |
484 | ||
485 | # create a bunch of files in a directory to snap | |
486 | self.mount_a.run_shell(["mkdir", "d0"]) | |
487 | self.mount_a.create_n_files('d0/file', 50, sync=True) | |
488 | ||
489 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
490 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
491 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
492 | ||
493 | # take a snapshot | |
494 | self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) | |
495 | ||
496 | time.sleep(30) | |
497 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
498 | "client.mirror_remote@ceph", '/d0', 'snap0', 1) | |
499 | self.verify_snapshot('d0', 'snap0') | |
500 | ||
501 | # some more IO | |
502 | self.mount_a.run_shell(["mkdir", "d0/d00"]) | |
503 | self.mount_a.run_shell(["mkdir", "d0/d01"]) | |
504 | ||
505 | self.mount_a.create_n_files('d0/d00/more_file', 20, sync=True) | |
506 | self.mount_a.create_n_files('d0/d01/some_more_file', 75, sync=True) | |
507 | ||
508 | # take another snapshot | |
509 | self.mount_a.run_shell(["mkdir", "d0/.snap/snap1"]) | |
510 | ||
511 | time.sleep(60) | |
512 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
513 | "client.mirror_remote@ceph", '/d0', 'snap1', 2) | |
514 | self.verify_snapshot('d0', 'snap1') | |
515 | ||
516 | # delete a snapshot | |
517 | self.mount_a.run_shell(["rmdir", "d0/.snap/snap0"]) | |
518 | ||
519 | time.sleep(10) | |
520 | snap_list = self.mount_b.ls(path='d0/.snap') | |
521 | self.assertTrue('snap0' not in snap_list) | |
522 | self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id, | |
523 | "client.mirror_remote@ceph", '/d0', 1) | |
524 | ||
525 | # rename a snapshot | |
526 | self.mount_a.run_shell(["mv", "d0/.snap/snap1", "d0/.snap/snap2"]) | |
527 | ||
528 | time.sleep(10) | |
529 | snap_list = self.mount_b.ls(path='d0/.snap') | |
530 | self.assertTrue('snap1' not in snap_list) | |
531 | self.assertTrue('snap2' in snap_list) | |
532 | self.check_peer_status_renamed_snap(self.primary_fs_name, self.primary_fs_id, | |
533 | "client.mirror_remote@ceph", '/d0', 1) | |
534 | ||
535 | self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
536 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
537 | ||
538 | def test_cephfs_mirror_cancel_sync(self): | |
539 | log.debug('reconfigure client auth caps') | |
540 | self.mds_cluster.mon_manager.raw_cluster_cmd_result( | |
541 | 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), | |
542 | 'mds', 'allow rw', | |
543 | 'mon', 'allow r', | |
544 | 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( | |
545 | self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) | |
546 | ||
547 | log.debug(f'mounting filesystem {self.secondary_fs_name}') | |
548 | self.mount_b.umount_wait() | |
549 | self.mount_b.mount(cephfs_name=self.secondary_fs_name) | |
550 | ||
551 | # create a bunch of files in a directory to snap | |
552 | self.mount_a.run_shell(["mkdir", "d0"]) | |
553 | for i in range(8): | |
554 | filename = f'file.{i}' | |
555 | self.mount_a.write_n_mb(os.path.join('d0', filename), 1024) | |
556 | ||
557 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
558 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
559 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
560 | ||
561 | # take a snapshot | |
562 | self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) | |
563 | ||
564 | time.sleep(10) | |
565 | self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id, | |
566 | "client.mirror_remote@ceph", '/d0', 'snap0') | |
567 | ||
568 | self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
569 | ||
570 | snap_list = self.mount_b.ls(path='d0/.snap') | |
571 | self.assertTrue('snap0' not in snap_list) | |
572 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
573 | ||
574 | def test_cephfs_mirror_restart_sync_on_blocklist(self): | |
575 | log.debug('reconfigure client auth caps') | |
576 | self.mds_cluster.mon_manager.raw_cluster_cmd_result( | |
577 | 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), | |
578 | 'mds', 'allow rw', | |
579 | 'mon', 'allow r', | |
580 | 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( | |
581 | self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) | |
582 | ||
583 | log.debug(f'mounting filesystem {self.secondary_fs_name}') | |
584 | self.mount_b.umount_wait() | |
585 | self.mount_b.mount(cephfs_name=self.secondary_fs_name) | |
586 | ||
587 | # create a bunch of files in a directory to snap | |
588 | self.mount_a.run_shell(["mkdir", "d0"]) | |
589 | for i in range(8): | |
590 | filename = f'file.{i}' | |
591 | self.mount_a.write_n_mb(os.path.join('d0', filename), 1024) | |
592 | ||
593 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
594 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
595 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
596 | ||
597 | # fetch rados address for blacklist check | |
598 | rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id) | |
599 | ||
600 | # take a snapshot | |
601 | self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) | |
602 | ||
603 | time.sleep(10) | |
604 | self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id, | |
605 | "client.mirror_remote@ceph", '/d0', 'snap0') | |
606 | ||
607 | # simulate non-responding mirror daemon by sending SIGSTOP | |
608 | pid = self.get_mirror_daemon_pid() | |
609 | log.debug(f'SIGSTOP to cephfs-mirror pid {pid}') | |
610 | self.mount_a.run_shell(['kill', '-SIGSTOP', pid]) | |
611 | ||
612 | # wait for blocklist timeout -- the manager module would blocklist | |
613 | # the mirror daemon | |
614 | time.sleep(40) | |
615 | ||
616 | # wake up the mirror daemon -- at this point, the daemon should know | |
617 | # that it has been blocklisted | |
618 | log.debug('SIGCONT to cephfs-mirror') | |
619 | self.mount_a.run_shell(['kill', '-SIGCONT', pid]) | |
620 | ||
621 | # check if the rados addr is blocklisted | |
622 | blocklist = self.get_blocklisted_instances() | |
623 | self.assertTrue(rados_inst in blocklist) | |
624 | ||
625 | time.sleep(500) | |
626 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
627 | "client.mirror_remote@ceph", '/d0', 'snap0', expected_snap_count=1) | |
628 | self.verify_snapshot('d0', 'snap0') | |
629 | ||
630 | self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
631 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
632 | ||
633 | def test_cephfs_mirror_failed_sync_with_correction(self): | |
634 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
635 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
636 | ||
637 | # add a non-existent directory for synchronization | |
638 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
639 | ||
640 | # wait for mirror daemon to mark it the directory as failed | |
641 | time.sleep(120) | |
642 | self.verify_failed_directory(self.primary_fs_name, self.primary_fs_id, | |
643 | "client.mirror_remote@ceph", '/d0') | |
644 | ||
645 | # create the directory | |
646 | self.mount_a.run_shell(["mkdir", "d0"]) | |
647 | self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) | |
648 | ||
649 | # wait for correction | |
650 | time.sleep(120) | |
651 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
652 | "client.mirror_remote@ceph", '/d0', 'snap0', 1) | |
653 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
654 | ||
655 | def test_cephfs_mirror_service_daemon_status(self): | |
656 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
657 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
f67539c2 TL |
658 | |
659 | time.sleep(30) | |
b3b6e05e | 660 | status = self.get_mirror_daemon_status(self.primary_fs_name, self.primary_fs_id) |
f67539c2 | 661 | |
b3b6e05e TL |
662 | # assumption for this test: mirroring enabled for a single filesystem w/ single |
663 | # peer | |
f67539c2 | 664 | |
b3b6e05e TL |
665 | # we have not added any directories |
666 | peer = status['filesystems'][0]['peers'][0] | |
667 | self.assertEquals(status['filesystems'][0]['directory_count'], 0) | |
668 | self.assertEquals(peer['stats']['failure_count'], 0) | |
669 | self.assertEquals(peer['stats']['recovery_count'], 0) | |
f67539c2 TL |
670 | |
671 | # add a non-existent directory for synchronization -- check if its reported | |
672 | # in daemon stats | |
673 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
674 | ||
675 | time.sleep(120) | |
b3b6e05e | 676 | status = self.get_mirror_daemon_status(self.primary_fs_name, self.primary_fs_id) |
f67539c2 | 677 | # we added one |
b3b6e05e TL |
678 | peer = status['filesystems'][0]['peers'][0] |
679 | self.assertEquals(status['filesystems'][0]['directory_count'], 1) | |
f67539c2 | 680 | # failure count should be reflected |
b3b6e05e TL |
681 | self.assertEquals(peer['stats']['failure_count'], 1) |
682 | self.assertEquals(peer['stats']['recovery_count'], 0) | |
f67539c2 TL |
683 | |
684 | # create the directory, mirror daemon would recover | |
685 | self.mount_a.run_shell(["mkdir", "d0"]) | |
686 | ||
687 | time.sleep(120) | |
b3b6e05e TL |
688 | status = self.get_mirror_daemon_status(self.primary_fs_name, self.primary_fs_id) |
689 | peer = status['filesystems'][0]['peers'][0] | |
690 | self.assertEquals(status['filesystems'][0]['directory_count'], 1) | |
f67539c2 | 691 | # failure and recovery count should be reflected |
b3b6e05e TL |
692 | self.assertEquals(peer['stats']['failure_count'], 1) |
693 | self.assertEquals(peer['stats']['recovery_count'], 1) | |
f67539c2 TL |
694 | |
695 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
696 | ||
697 | def test_mirroring_init_failure(self): | |
698 | """Test mirror daemon init failure""" | |
699 | ||
b3b6e05e TL |
700 | # disable mgr mirroring plugin as it would try to load dir map on |
701 | # on mirroring enabled for a filesystem (an throw up erorrs in | |
702 | # the logs) | |
703 | self.disable_mirroring_module() | |
704 | ||
f67539c2 TL |
705 | # enable mirroring through mon interface -- this should result in the mirror daemon |
706 | # failing to enable mirroring due to absence of `cephfs_mirorr` index object. | |
707 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "enable", self.primary_fs_name) | |
708 | ||
709 | with safe_while(sleep=5, tries=10, action='wait for failed state') as proceed: | |
710 | while proceed(): | |
711 | try: | |
712 | # verify via asok | |
713 | res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', | |
714 | 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') | |
715 | if not 'state' in res: | |
716 | return | |
717 | self.assertTrue(res['state'] == "failed") | |
718 | return True | |
719 | except: | |
720 | pass | |
721 | ||
722 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "disable", self.primary_fs_name) | |
723 | time.sleep(10) | |
724 | # verify via asok | |
725 | try: | |
726 | self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', | |
727 | 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') | |
728 | except CommandFailedError: | |
729 | pass | |
730 | else: | |
731 | raise RuntimeError('expected admin socket to be unavailable') | |
732 | ||
733 | def test_mirroring_init_failure_with_recovery(self): | |
734 | """Test if the mirror daemon can recover from a init failure""" | |
735 | ||
736 | # disable mgr mirroring plugin as it would try to load dir map on | |
737 | # on mirroring enabled for a filesystem (an throw up erorrs in | |
738 | # the logs) | |
739 | self.disable_mirroring_module() | |
740 | ||
741 | # enable mirroring through mon interface -- this should result in the mirror daemon | |
742 | # failing to enable mirroring due to absence of `cephfs_mirror` index object. | |
743 | ||
744 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "enable", self.primary_fs_name) | |
745 | # need safe_while since non-failed status pops up as mirroring is restarted | |
746 | # internally in mirror daemon. | |
747 | with safe_while(sleep=5, tries=20, action='wait for failed state') as proceed: | |
748 | while proceed(): | |
749 | try: | |
750 | # verify via asok | |
751 | res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', | |
752 | 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') | |
753 | if not 'state' in res: | |
754 | return | |
755 | self.assertTrue(res['state'] == "failed") | |
756 | return True | |
757 | except: | |
758 | pass | |
759 | ||
760 | # create the index object and check daemon recovery | |
761 | try: | |
762 | p = self.mount_a.client_remote.run(args=['rados', '-p', self.fs.metadata_pool_name, 'create', 'cephfs_mirror'], | |
763 | stdout=StringIO(), stderr=StringIO(), timeout=30, | |
764 | check_status=True, label="create index object") | |
765 | p.wait() | |
766 | except CommandFailedError as ce: | |
767 | log.warn(f'mirror daemon command to create mirror index object failed: {ce}') | |
768 | raise | |
769 | time.sleep(30) | |
770 | res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', | |
771 | 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') | |
772 | self.assertTrue(res['peers'] == {}) | |
773 | self.assertTrue(res['snap_dirs']['dir_count'] == 0) | |
774 | ||
775 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "disable", self.primary_fs_name) | |
776 | time.sleep(10) | |
777 | # verify via asok | |
778 | try: | |
779 | self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', | |
780 | 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') | |
781 | except CommandFailedError: | |
782 | pass | |
783 | else: | |
784 | raise RuntimeError('expected admin socket to be unavailable') | |
785 | ||
786 | def test_cephfs_mirror_peer_bootstrap(self): | |
787 | """Test importing peer bootstrap token""" | |
788 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
789 | ||
790 | # create a bootstrap token for the peer | |
791 | bootstrap_token = self.bootstrap_peer(self.secondary_fs_name, "client.mirror_peer_bootstrap", "site-remote") | |
792 | ||
793 | # import the peer via bootstrap token | |
794 | self.import_peer(self.primary_fs_name, bootstrap_token) | |
795 | time.sleep(10) | |
796 | self.verify_peer_added(self.primary_fs_name, self.primary_fs_id, "client.mirror_peer_bootstrap@site-remote", | |
797 | self.secondary_fs_name) | |
798 | ||
799 | # verify via peer_list interface | |
800 | peer_uuid = self.get_peer_uuid("client.mirror_peer_bootstrap@site-remote") | |
801 | res = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_list", self.primary_fs_name)) | |
802 | self.assertTrue(peer_uuid in res) | |
803 | self.assertTrue('mon_host' in res[peer_uuid] and res[peer_uuid]['mon_host'] != '') | |
804 | ||
805 | # remove peer | |
806 | self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_peer_bootstrap@site-remote") | |
807 | # disable mirroring | |
808 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
809 | ||
810 | def test_cephfs_mirror_symlink_sync(self): | |
811 | log.debug('reconfigure client auth caps') | |
812 | self.mds_cluster.mon_manager.raw_cluster_cmd_result( | |
813 | 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), | |
814 | 'mds', 'allow rw', | |
815 | 'mon', 'allow r', | |
816 | 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( | |
817 | self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) | |
818 | ||
819 | log.debug(f'mounting filesystem {self.secondary_fs_name}') | |
820 | self.mount_b.umount_wait() | |
821 | self.mount_b.mount(cephfs_name=self.secondary_fs_name) | |
822 | ||
823 | # create a bunch of files w/ symbolic links in a directory to snap | |
824 | self.mount_a.run_shell(["mkdir", "d0"]) | |
825 | self.mount_a.create_n_files('d0/file', 10, sync=True) | |
826 | self.mount_a.run_shell(["ln", "-s", "./file_0", "d0/sym_0"]) | |
827 | self.mount_a.run_shell(["ln", "-s", "./file_1", "d0/sym_1"]) | |
828 | self.mount_a.run_shell(["ln", "-s", "./file_2", "d0/sym_2"]) | |
829 | ||
830 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
831 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
832 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
833 | ||
834 | # take a snapshot | |
835 | self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) | |
836 | ||
837 | time.sleep(30) | |
838 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
839 | "client.mirror_remote@ceph", '/d0', 'snap0', 1) | |
840 | self.verify_snapshot('d0', 'snap0') | |
841 | ||
842 | self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
843 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
b3b6e05e TL |
844 | |
845 | def test_cephfs_mirror_with_parent_snapshot(self): | |
846 | """Test snapshot synchronization with parent directory snapshots""" | |
847 | self.mount_a.run_shell(["mkdir", "-p", "d0/d1/d2/d3"]) | |
848 | ||
849 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
850 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0/d1/d2/d3') | |
851 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
852 | ||
853 | # take a snapshot | |
854 | self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap0"]) | |
855 | ||
856 | time.sleep(30) | |
857 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
858 | "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap0', 1) | |
859 | ||
860 | # create snapshots in parent directories | |
861 | self.mount_a.run_shell(["mkdir", "d0/.snap/snap_d0"]) | |
862 | self.mount_a.run_shell(["mkdir", "d0/d1/.snap/snap_d1"]) | |
863 | self.mount_a.run_shell(["mkdir", "d0/d1/d2/.snap/snap_d2"]) | |
864 | ||
865 | # try syncing more snapshots | |
866 | self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap1"]) | |
867 | time.sleep(30) | |
868 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
869 | "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap1', 2) | |
870 | ||
871 | self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap0"]) | |
872 | self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap1"]) | |
873 | time.sleep(15) | |
874 | self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id, | |
875 | "client.mirror_remote@ceph", '/d0/d1/d2/d3', 2) | |
876 | ||
877 | self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0/d1/d2/d3') | |
878 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
879 | ||
880 | def test_cephfs_mirror_remove_on_stall(self): | |
881 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
882 | ||
883 | # fetch rados address for blacklist check | |
884 | rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id) | |
885 | ||
886 | # simulate non-responding mirror daemon by sending SIGSTOP | |
887 | pid = self.get_mirror_daemon_pid() | |
888 | log.debug(f'SIGSTOP to cephfs-mirror pid {pid}') | |
889 | self.mount_a.run_shell(['kill', '-SIGSTOP', pid]) | |
890 | ||
891 | # wait for blocklist timeout -- the manager module would blocklist | |
892 | # the mirror daemon | |
893 | time.sleep(40) | |
894 | ||
895 | # make sure the rados addr is blocklisted | |
896 | blocklist = self.get_blocklisted_instances() | |
897 | self.assertTrue(rados_inst in blocklist) | |
898 | ||
899 | # now we are sure that there are no "active" mirror daemons -- add a directory path. | |
900 | dir_path_p = "/d0/d1" | |
901 | dir_path = "/d0/d1/d2" | |
902 | ||
903 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", self.primary_fs_name, dir_path) | |
904 | ||
905 | time.sleep(10) | |
906 | # this uses an undocumented interface to get dirpath map state | |
907 | res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path) | |
908 | res = json.loads(res_json) | |
909 | # there are no mirror daemons | |
910 | self.assertTrue(res['state'], 'stalled') | |
911 | ||
912 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "remove", self.primary_fs_name, dir_path) | |
913 | ||
914 | time.sleep(10) | |
915 | try: | |
916 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path) | |
917 | except CommandFailedError as ce: | |
918 | if ce.exitstatus != errno.ENOENT: | |
919 | raise RuntimeError('invalid errno when checking dirmap status for non-existent directory') | |
920 | else: | |
921 | raise RuntimeError('incorrect errno when checking dirmap state for non-existent directory') | |
922 | ||
923 | # adding a parent directory should be allowed | |
924 | self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", self.primary_fs_name, dir_path_p) | |
925 | ||
926 | time.sleep(10) | |
927 | # however, this directory path should get stalled too | |
928 | res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path_p) | |
929 | res = json.loads(res_json) | |
930 | # there are no mirror daemons | |
931 | self.assertTrue(res['state'], 'stalled') | |
932 | ||
933 | # wake up the mirror daemon -- at this point, the daemon should know | |
934 | # that it has been blocklisted | |
935 | log.debug('SIGCONT to cephfs-mirror') | |
936 | self.mount_a.run_shell(['kill', '-SIGCONT', pid]) | |
937 | ||
938 | # wait for restart mirror on blocklist | |
939 | time.sleep(60) | |
940 | res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path_p) | |
941 | res = json.loads(res_json) | |
942 | # there are no mirror daemons | |
943 | self.assertTrue(res['state'], 'mapped') | |
944 | ||
945 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
946 | ||
947 | def test_cephfs_mirror_incremental_sync(self): | |
948 | """ Test incremental snapshot synchronization (based on mtime differences).""" | |
949 | log.debug('reconfigure client auth caps') | |
950 | self.mds_cluster.mon_manager.raw_cluster_cmd_result( | |
951 | 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), | |
952 | 'mds', 'allow rw', | |
953 | 'mon', 'allow r', | |
954 | 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( | |
955 | self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) | |
956 | log.debug(f'mounting filesystem {self.secondary_fs_name}') | |
957 | self.mount_b.umount_wait() | |
958 | self.mount_b.mount(cephfs_name=self.secondary_fs_name) | |
959 | ||
960 | repo = 'ceph-qa-suite' | |
961 | repo_dir = 'ceph_repo' | |
962 | repo_path = f'{repo_dir}/{repo}' | |
963 | ||
964 | def clone_repo(): | |
965 | self.mount_a.run_shell([ | |
966 | 'git', 'clone', '--branch', 'giant', | |
967 | f'http://github.com/ceph/{repo}', repo_path]) | |
968 | ||
969 | def exec_git_cmd(cmd_list): | |
970 | self.mount_a.run_shell(['git', '--git-dir', f'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list]) | |
971 | ||
972 | self.mount_a.run_shell(["mkdir", repo_dir]) | |
973 | clone_repo() | |
974 | ||
975 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
976 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
977 | ||
978 | self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}') | |
979 | self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a']) | |
980 | ||
981 | # full copy, takes time | |
982 | time.sleep(500) | |
983 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
984 | "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1) | |
985 | self.verify_snapshot(repo_path, 'snap_a') | |
986 | ||
987 | # create some diff | |
988 | num = random.randint(5, 20) | |
989 | log.debug(f'resetting to HEAD~{num}') | |
990 | exec_git_cmd(["reset", "--hard", f'HEAD~{num}']) | |
991 | ||
992 | self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b']) | |
993 | # incremental copy, should be fast | |
994 | time.sleep(180) | |
995 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
996 | "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2) | |
997 | self.verify_snapshot(repo_path, 'snap_b') | |
998 | ||
999 | # diff again, this time back to HEAD | |
1000 | log.debug('resetting to HEAD') | |
1001 | exec_git_cmd(["pull"]) | |
1002 | ||
1003 | self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_c']) | |
1004 | # incremental copy, should be fast | |
1005 | time.sleep(180) | |
1006 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
1007 | "client.mirror_remote@ceph", f'/{repo_path}', 'snap_c', 3) | |
1008 | self.verify_snapshot(repo_path, 'snap_c') | |
1009 | ||
1010 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
1011 | ||
1012 | def test_cephfs_mirror_incremental_sync_with_type_mixup(self): | |
1013 | """ Test incremental snapshot synchronization with file type changes. | |
1014 | ||
1015 | The same filename exist as a different type in subsequent snapshot. | |
1016 | This verifies if the mirror daemon can identify file type mismatch and | |
1017 | sync snapshots. | |
1018 | ||
1019 | \ snap_0 snap_1 snap_2 snap_3 | |
1020 | \----------------------------------------------- | |
1021 | file_x | reg sym dir reg | |
1022 | | | |
1023 | file_y | dir reg sym dir | |
1024 | | | |
1025 | file_z | sym dir reg sym | |
1026 | """ | |
1027 | log.debug('reconfigure client auth caps') | |
1028 | self.mds_cluster.mon_manager.raw_cluster_cmd_result( | |
1029 | 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), | |
1030 | 'mds', 'allow rw', | |
1031 | 'mon', 'allow r', | |
1032 | 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( | |
1033 | self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) | |
1034 | log.debug(f'mounting filesystem {self.secondary_fs_name}') | |
1035 | self.mount_b.umount_wait() | |
1036 | self.mount_b.mount(cephfs_name=self.secondary_fs_name) | |
1037 | ||
1038 | typs = deque(['reg', 'dir', 'sym']) | |
1039 | def cleanup_and_create_with_type(dirname, fnames): | |
1040 | self.mount_a.run_shell_payload(f"rm -rf {dirname}/*") | |
1041 | fidx = 0 | |
1042 | for t in typs: | |
1043 | fname = f'{dirname}/{fnames[fidx]}' | |
1044 | log.debug(f'file: {fname} type: {t}') | |
1045 | if t == 'reg': | |
1046 | self.mount_a.run_shell(["touch", fname]) | |
1047 | self.mount_a.write_file(fname, data=fname) | |
1048 | elif t == 'dir': | |
1049 | self.mount_a.run_shell(["mkdir", fname]) | |
1050 | elif t == 'sym': | |
1051 | # verify ELOOP in mirror daemon | |
1052 | self.mount_a.run_shell(["ln", "-s", "..", fname]) | |
1053 | fidx += 1 | |
1054 | ||
1055 | def verify_types(dirname, fnames, snap_name): | |
1056 | tidx = 0 | |
1057 | for fname in fnames: | |
1058 | t = self.mount_b.run_shell_payload(f"stat -c %F {dirname}/.snap/{snap_name}/{fname}").stdout.getvalue().strip() | |
1059 | if typs[tidx] == 'reg': | |
1060 | self.assertEquals('regular file', t) | |
1061 | elif typs[tidx] == 'dir': | |
1062 | self.assertEquals('directory', t) | |
1063 | elif typs[tidx] == 'sym': | |
1064 | self.assertEquals('symbolic link', t) | |
1065 | tidx += 1 | |
1066 | ||
1067 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
1068 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
1069 | ||
1070 | self.mount_a.run_shell(["mkdir", "d0"]) | |
1071 | self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') | |
1072 | ||
1073 | fnames = ['file_x', 'file_y', 'file_z'] | |
1074 | turns = 0 | |
1075 | while turns != len(typs): | |
1076 | snapname = f'snap_{turns}' | |
1077 | cleanup_and_create_with_type('d0', fnames) | |
1078 | self.mount_a.run_shell(['mkdir', f'd0/.snap/{snapname}']) | |
1079 | time.sleep(30) | |
1080 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
1081 | "client.mirror_remote@ceph", '/d0', snapname, turns+1) | |
1082 | verify_types('d0', fnames, snapname) | |
1083 | # next type | |
1084 | typs.rotate(1) | |
1085 | turns += 1 | |
1086 | ||
1087 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
1088 | ||
1089 | def test_cephfs_mirror_sync_with_purged_snapshot(self): | |
1090 | """Test snapshot synchronization in midst of snapshot deletes. | |
1091 | ||
1092 | Deleted the previous snapshot when the mirror daemon is figuring out | |
1093 | incremental differences between current and previous snaphot. The | |
1094 | mirror daemon should identify the purge and switch to using remote | |
1095 | comparison to sync the snapshot (in the next iteration of course). | |
1096 | """ | |
1097 | ||
1098 | log.debug('reconfigure client auth caps') | |
1099 | self.mds_cluster.mon_manager.raw_cluster_cmd_result( | |
1100 | 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), | |
1101 | 'mds', 'allow rw', | |
1102 | 'mon', 'allow r', | |
1103 | 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( | |
1104 | self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) | |
1105 | log.debug(f'mounting filesystem {self.secondary_fs_name}') | |
1106 | self.mount_b.umount_wait() | |
1107 | self.mount_b.mount(cephfs_name=self.secondary_fs_name) | |
1108 | ||
1109 | repo = 'ceph-qa-suite' | |
1110 | repo_dir = 'ceph_repo' | |
1111 | repo_path = f'{repo_dir}/{repo}' | |
1112 | ||
1113 | def clone_repo(): | |
1114 | self.mount_a.run_shell([ | |
1115 | 'git', 'clone', '--branch', 'giant', | |
1116 | f'http://github.com/ceph/{repo}', repo_path]) | |
1117 | ||
1118 | def exec_git_cmd(cmd_list): | |
1119 | self.mount_a.run_shell(['git', '--git-dir', f'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list]) | |
1120 | ||
1121 | self.mount_a.run_shell(["mkdir", repo_dir]) | |
1122 | clone_repo() | |
1123 | ||
1124 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
1125 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
1126 | ||
1127 | self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}') | |
1128 | self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a']) | |
1129 | ||
1130 | # full copy, takes time | |
1131 | time.sleep(500) | |
1132 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
1133 | "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1) | |
1134 | self.verify_snapshot(repo_path, 'snap_a') | |
1135 | ||
1136 | # create some diff | |
1137 | num = random.randint(60, 100) | |
1138 | log.debug(f'resetting to HEAD~{num}') | |
1139 | exec_git_cmd(["reset", "--hard", f'HEAD~{num}']) | |
1140 | ||
1141 | self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b']) | |
1142 | ||
1143 | time.sleep(15) | |
1144 | self.mount_a.run_shell(['rmdir', f'{repo_path}/.snap/snap_a']) | |
1145 | ||
1146 | # incremental copy but based on remote dir_root | |
1147 | time.sleep(300) | |
1148 | self.check_peer_status(self.primary_fs_name, self.primary_fs_id, | |
1149 | "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2) | |
1150 | self.verify_snapshot(repo_path, 'snap_b') | |
1151 | ||
1152 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
1153 | ||
1154 | def test_cephfs_mirror_peer_add_primary(self): | |
1155 | self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) | |
1156 | self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) | |
1157 | ||
1158 | # try adding the primary file system as a peer to secondary file | |
1159 | # system | |
1160 | try: | |
1161 | self.peer_add(self.secondary_fs_name, self.secondary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name) | |
1162 | except CommandFailedError as ce: | |
1163 | if ce.exitstatus != errno.EINVAL: | |
1164 | raise RuntimeError('invalid errno when adding a primary file system') | |
1165 | else: | |
1166 | raise RuntimeError('adding peer should fail') | |
1167 | ||
1168 | self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) |