]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_mirroring.py
import ceph quincy 17.2.4
[ceph.git] / ceph / qa / tasks / cephfs / test_mirroring.py
1 import os
2 import json
3 import errno
4 import logging
5 import random
6 import time
7
8 from io import StringIO
9 from collections import deque
10
11 from tasks.cephfs.cephfs_test_case import CephFSTestCase
12 from teuthology.exceptions import CommandFailedError
13 from teuthology.contextutil import safe_while
14
15 log = logging.getLogger(__name__)
16
17 class TestMirroring(CephFSTestCase):
18 MDSS_REQUIRED = 5
19 CLIENTS_REQUIRED = 2
20 REQUIRE_BACKUP_FILESYSTEM = True
21
22 MODULE_NAME = "mirroring"
23
24 def setUp(self):
25 super(TestMirroring, self).setUp()
26 self.primary_fs_name = self.fs.name
27 self.primary_fs_id = self.fs.id
28 self.secondary_fs_name = self.backup_fs.name
29 self.secondary_fs_id = self.backup_fs.id
30 self.enable_mirroring_module()
31
32 def tearDown(self):
33 self.disable_mirroring_module()
34 super(TestMirroring, self).tearDown()
35
36 def enable_mirroring_module(self):
37 self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable", TestMirroring.MODULE_NAME)
38
39 def disable_mirroring_module(self):
40 self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "disable", TestMirroring.MODULE_NAME)
41
42 def enable_mirroring(self, fs_name, fs_id):
43 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "enable", fs_name)
44 time.sleep(10)
45 # verify via asok
46 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
47 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
48 self.assertTrue(res['peers'] == {})
49 self.assertTrue(res['snap_dirs']['dir_count'] == 0)
50
51 def disable_mirroring(self, fs_name, fs_id):
52 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "disable", fs_name)
53 time.sleep(10)
54 # verify via asok
55 try:
56 self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
57 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
58 except CommandFailedError:
59 pass
60 else:
61 raise RuntimeError('expected admin socket to be unavailable')
62
63 def verify_peer_added(self, fs_name, fs_id, peer_spec, remote_fs_name=None):
64 # verify via asok
65 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
66 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
67 peer_uuid = self.get_peer_uuid(peer_spec)
68 self.assertTrue(peer_uuid in res['peers'])
69 client_name = res['peers'][peer_uuid]['remote']['client_name']
70 cluster_name = res['peers'][peer_uuid]['remote']['cluster_name']
71 self.assertTrue(peer_spec == f'{client_name}@{cluster_name}')
72 if remote_fs_name:
73 self.assertTrue(self.secondary_fs_name == res['peers'][peer_uuid]['remote']['fs_name'])
74 else:
75 self.assertTrue(self.fs_name == res['peers'][peer_uuid]['remote']['fs_name'])
76
77 def peer_add(self, fs_name, fs_id, peer_spec, remote_fs_name=None):
78 if remote_fs_name:
79 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_add", fs_name, peer_spec, remote_fs_name)
80 else:
81 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_add", fs_name, peer_spec)
82 time.sleep(10)
83 self.verify_peer_added(fs_name, fs_id, peer_spec, remote_fs_name)
84
85 def peer_remove(self, fs_name, fs_id, peer_spec):
86 peer_uuid = self.get_peer_uuid(peer_spec)
87 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_remove", fs_name, peer_uuid)
88 time.sleep(10)
89 # verify via asok
90 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
91 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
92 self.assertTrue(res['peers'] == {} and res['snap_dirs']['dir_count'] == 0)
93
94 def bootstrap_peer(self, fs_name, client_name, site_name):
95 outj = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd(
96 "fs", "snapshot", "mirror", "peer_bootstrap", "create", fs_name, client_name, site_name))
97 return outj['token']
98
99 def import_peer(self, fs_name, token):
100 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_bootstrap", "import",
101 fs_name, token)
102
103 def add_directory(self, fs_name, fs_id, dir_name):
104 # get initial dir count
105 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
106 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
107 dir_count = res['snap_dirs']['dir_count']
108 log.debug(f'initial dir_count={dir_count}')
109
110 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", fs_name, dir_name)
111
112 time.sleep(10)
113 # verify via asok
114 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
115 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
116 new_dir_count = res['snap_dirs']['dir_count']
117 log.debug(f'new dir_count={new_dir_count}')
118 self.assertTrue(new_dir_count > dir_count)
119
120 def remove_directory(self, fs_name, fs_id, dir_name):
121 # get initial dir count
122 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
123 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
124 dir_count = res['snap_dirs']['dir_count']
125 log.debug(f'initial dir_count={dir_count}')
126
127 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "remove", fs_name, dir_name)
128
129 time.sleep(10)
130 # verify via asok
131 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
132 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
133 new_dir_count = res['snap_dirs']['dir_count']
134 log.debug(f'new dir_count={new_dir_count}')
135 self.assertTrue(new_dir_count < dir_count)
136
137 def check_peer_status(self, fs_name, fs_id, peer_spec, dir_name, expected_snap_name,
138 expected_snap_count):
139 peer_uuid = self.get_peer_uuid(peer_spec)
140 res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
141 'fs', 'mirror', 'peer', 'status',
142 f'{fs_name}@{fs_id}', peer_uuid)
143 self.assertTrue(dir_name in res)
144 self.assertTrue(res[dir_name]['last_synced_snap']['name'] == expected_snap_name)
145 self.assertTrue(res[dir_name]['snaps_synced'] == expected_snap_count)
146
147 def check_peer_status_deleted_snap(self, fs_name, fs_id, peer_spec, dir_name,
148 expected_delete_count):
149 peer_uuid = self.get_peer_uuid(peer_spec)
150 res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
151 'fs', 'mirror', 'peer', 'status',
152 f'{fs_name}@{fs_id}', peer_uuid)
153 self.assertTrue(dir_name in res)
154 self.assertTrue(res[dir_name]['snaps_deleted'] == expected_delete_count)
155
156 def check_peer_status_renamed_snap(self, fs_name, fs_id, peer_spec, dir_name,
157 expected_rename_count):
158 peer_uuid = self.get_peer_uuid(peer_spec)
159 res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
160 'fs', 'mirror', 'peer', 'status',
161 f'{fs_name}@{fs_id}', peer_uuid)
162 self.assertTrue(dir_name in res)
163 self.assertTrue(res[dir_name]['snaps_renamed'] == expected_rename_count)
164
165 def check_peer_snap_in_progress(self, fs_name, fs_id,
166 peer_spec, dir_name, snap_name):
167 peer_uuid = self.get_peer_uuid(peer_spec)
168 res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
169 'fs', 'mirror', 'peer', 'status',
170 f'{fs_name}@{fs_id}', peer_uuid)
171 self.assertTrue('syncing' == res[dir_name]['state'])
172 self.assertTrue(res[dir_name]['current_sycning_snap']['name'] == snap_name)
173
174 def verify_snapshot(self, dir_name, snap_name):
175 snap_list = self.mount_b.ls(path=f'{dir_name}/.snap')
176 self.assertTrue(snap_name in snap_list)
177
178 source_res = self.mount_a.dir_checksum(path=f'{dir_name}/.snap/{snap_name}',
179 follow_symlinks=True)
180 log.debug(f'source snapshot checksum {snap_name} {source_res}')
181
182 dest_res = self.mount_b.dir_checksum(path=f'{dir_name}/.snap/{snap_name}',
183 follow_symlinks=True)
184 log.debug(f'destination snapshot checksum {snap_name} {dest_res}')
185 self.assertTrue(source_res == dest_res)
186
187 def verify_failed_directory(self, fs_name, fs_id, peer_spec, dir_name):
188 peer_uuid = self.get_peer_uuid(peer_spec)
189 res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
190 'fs', 'mirror', 'peer', 'status',
191 f'{fs_name}@{fs_id}', peer_uuid)
192 self.assertTrue('failed' == res[dir_name]['state'])
193
194 def get_peer_uuid(self, peer_spec):
195 status = self.fs.status()
196 fs_map = status.get_fsmap_byname(self.primary_fs_name)
197 peers = fs_map['mirror_info']['peers']
198 for peer_uuid, mirror_info in peers.items():
199 client_name = mirror_info['remote']['client_name']
200 cluster_name = mirror_info['remote']['cluster_name']
201 remote_peer_spec = f'{client_name}@{cluster_name}'
202 if peer_spec == remote_peer_spec:
203 return peer_uuid
204 return None
205
206 def get_daemon_admin_socket(self):
207 """overloaded by teuthology override (fs/mirror/clients/mirror.yaml)"""
208 return "/var/run/ceph/cephfs-mirror.asok"
209
210 def get_mirror_daemon_pid(self):
211 """pid file overloaded in fs/mirror/clients/mirror.yaml"""
212 return self.mount_a.run_shell(['cat', '/var/run/ceph/cephfs-mirror.pid']).stdout.getvalue().strip()
213
214 def get_mirror_rados_addr(self, fs_name, fs_id):
215 """return the rados addr used by cephfs-mirror instance"""
216 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
217 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
218 return res['rados_inst']
219
220 def mirror_daemon_command(self, cmd_label, *args):
221 asok_path = self.get_daemon_admin_socket()
222 try:
223 # use mount_a's remote to execute command
224 p = self.mount_a.client_remote.run(args=
225 ['ceph', '--admin-daemon', asok_path] + list(args),
226 stdout=StringIO(), stderr=StringIO(), timeout=30,
227 check_status=True, label=cmd_label)
228 p.wait()
229 except CommandFailedError as ce:
230 log.warn(f'mirror daemon command with label "{cmd_label}" failed: {ce}')
231 raise
232 res = p.stdout.getvalue().strip()
233 log.debug(f'command returned={res}')
234 return json.loads(res)
235
236 def get_mirror_daemon_status(self):
237 daemon_status = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "daemon", "status"))
238 log.debug(f'daemon_status: {daemon_status}')
239 # running a single mirror daemon is supported
240 status = daemon_status[0]
241 log.debug(f'status: {status}')
242 return status
243
244 def test_basic_mirror_commands(self):
245 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
246 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
247
248 def test_mirror_peer_commands(self):
249 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
250
251 # add peer
252 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
253 # remove peer
254 self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph")
255
256 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
257
258 def test_mirror_disable_with_peer(self):
259 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
260
261 # add peer
262 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
263
264 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
265
266 def test_matching_peer(self):
267 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
268
269 try:
270 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph")
271 except CommandFailedError as ce:
272 if ce.exitstatus != errno.EINVAL:
273 raise RuntimeError('invalid errno when adding a matching remote peer')
274 else:
275 raise RuntimeError('adding a peer matching local spec should fail')
276
277 # verify via asok -- nothing should get added
278 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
279 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
280 self.assertTrue(res['peers'] == {})
281
282 # and explicitly specifying the spec (via filesystem name) should fail too
283 try:
284 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name)
285 except CommandFailedError as ce:
286 if ce.exitstatus != errno.EINVAL:
287 raise RuntimeError('invalid errno when adding a matching remote peer')
288 else:
289 raise RuntimeError('adding a peer matching local spec should fail')
290
291 # verify via asok -- nothing should get added
292 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
293 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
294 self.assertTrue(res['peers'] == {})
295
296 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
297
298 def test_mirror_peer_add_existing(self):
299 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
300
301 # add peer
302 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
303
304 # adding the same peer should be idempotent
305 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
306
307 # remove peer
308 self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph")
309
310 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
311
312 def test_peer_commands_with_mirroring_disabled(self):
313 # try adding peer when mirroring is not enabled
314 try:
315 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
316 except CommandFailedError as ce:
317 if ce.exitstatus != errno.EINVAL:
318 raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a peer')
319 else:
320 raise RuntimeError(-errno.EINVAL, 'expected peer_add to fail')
321
322 # try removing peer
323 try:
324 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_remove", self.primary_fs_name, 'dummy-uuid')
325 except CommandFailedError as ce:
326 if ce.exitstatus != errno.EINVAL:
327 raise RuntimeError(-errno.EINVAL, 'incorrect error code when removing a peer')
328 else:
329 raise RuntimeError(-errno.EINVAL, 'expected peer_remove to fail')
330
331 def test_add_directory_with_mirroring_disabled(self):
332 # try adding a directory when mirroring is not enabled
333 try:
334 self.add_directory(self.primary_fs_name, self.primary_fs_id, "/d1")
335 except CommandFailedError as ce:
336 if ce.exitstatus != errno.EINVAL:
337 raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a directory')
338 else:
339 raise RuntimeError(-errno.EINVAL, 'expected directory add to fail')
340
341 def test_directory_commands(self):
342 self.mount_a.run_shell(["mkdir", "d1"])
343 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
344 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
345 try:
346 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
347 except CommandFailedError as ce:
348 if ce.exitstatus != errno.EEXIST:
349 raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-adding a directory')
350 else:
351 raise RuntimeError(-errno.EINVAL, 'expected directory add to fail')
352 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
353 try:
354 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
355 except CommandFailedError as ce:
356 if ce.exitstatus not in (errno.ENOENT, errno.EINVAL):
357 raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-deleting a directory')
358 else:
359 raise RuntimeError(-errno.EINVAL, 'expected directory removal to fail')
360 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
361 self.mount_a.run_shell(["rmdir", "d1"])
362
363 def test_add_relative_directory_path(self):
364 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
365 try:
366 self.add_directory(self.primary_fs_name, self.primary_fs_id, './d1')
367 except CommandFailedError as ce:
368 if ce.exitstatus != errno.EINVAL:
369 raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a relative path dir')
370 else:
371 raise RuntimeError(-errno.EINVAL, 'expected directory add to fail')
372 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
373
374 def test_add_directory_path_normalization(self):
375 self.mount_a.run_shell(["mkdir", "-p", "d1/d2/d3"])
376 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
377 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d2/d3')
378 def check_add_command_failure(dir_path):
379 try:
380 self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path)
381 except CommandFailedError as ce:
382 if ce.exitstatus != errno.EEXIST:
383 raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-adding a directory')
384 else:
385 raise RuntimeError(-errno.EINVAL, 'expected directory add to fail')
386
387 # everything points for /d1/d2/d3
388 check_add_command_failure('/d1/d2/././././././d3')
389 check_add_command_failure('/d1/d2/././././././d3//////')
390 check_add_command_failure('/d1/d2/../d2/././././d3')
391 check_add_command_failure('/././././d1/./././d2/./././d3//////')
392 check_add_command_failure('/./d1/./d2/./d3/../../../d1/d2/d3')
393
394 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
395 self.mount_a.run_shell(["rm", "-rf", "d1"])
396
397 def test_add_ancestor_and_child_directory(self):
398 self.mount_a.run_shell(["mkdir", "-p", "d1/d2/d3"])
399 self.mount_a.run_shell(["mkdir", "-p", "d1/d4"])
400 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
401 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d2/')
402 def check_add_command_failure(dir_path):
403 try:
404 self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path)
405 except CommandFailedError as ce:
406 if ce.exitstatus != errno.EINVAL:
407 raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a directory')
408 else:
409 raise RuntimeError(-errno.EINVAL, 'expected directory add to fail')
410
411 # cannot add ancestors or a subtree for an existing directory
412 check_add_command_failure('/')
413 check_add_command_failure('/d1')
414 check_add_command_failure('/d1/d2/d3')
415
416 # obviously, one can add a non-ancestor or non-subtree
417 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d4/')
418
419 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
420 self.mount_a.run_shell(["rm", "-rf", "d1"])
421
422 def test_cephfs_mirror_blocklist(self):
423 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
424
425 # add peer
426 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
427
428 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
429 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
430 peers_1 = set(res['peers'])
431
432 # fetch rados address for blacklist check
433 rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id)
434
435 # simulate non-responding mirror daemon by sending SIGSTOP
436 pid = self.get_mirror_daemon_pid()
437 log.debug(f'SIGSTOP to cephfs-mirror pid {pid}')
438 self.mount_a.run_shell(['kill', '-SIGSTOP', pid])
439
440 # wait for blocklist timeout -- the manager module would blocklist
441 # the mirror daemon
442 time.sleep(40)
443
444 # wake up the mirror daemon -- at this point, the daemon should know
445 # that it has been blocklisted
446 log.debug('SIGCONT to cephfs-mirror')
447 self.mount_a.run_shell(['kill', '-SIGCONT', pid])
448
449 # check if the rados addr is blocklisted
450 self.assertTrue(self.mds_cluster.is_addr_blocklisted(rados_inst))
451
452 # wait enough so that the mirror daemon restarts blocklisted instances
453 time.sleep(40)
454 rados_inst_new = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id)
455
456 # and we should get a new rados instance
457 self.assertTrue(rados_inst != rados_inst_new)
458
459 # along with peers that were added
460 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
461 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
462 peers_2 = set(res['peers'])
463 self.assertTrue(peers_1, peers_2)
464
465 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
466
467 def test_cephfs_mirror_stats(self):
468 log.debug('reconfigure client auth caps')
469 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
470 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
471 'mds', 'allow rw',
472 'mon', 'allow r',
473 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
474 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
475
476 log.debug(f'mounting filesystem {self.secondary_fs_name}')
477 self.mount_b.umount_wait()
478 self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name)
479
480 # create a bunch of files in a directory to snap
481 self.mount_a.run_shell(["mkdir", "d0"])
482 self.mount_a.create_n_files('d0/file', 50, sync=True)
483
484 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
485 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
486 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
487
488 # take a snapshot
489 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
490
491 time.sleep(30)
492 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
493 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
494 self.verify_snapshot('d0', 'snap0')
495
496 # some more IO
497 self.mount_a.run_shell(["mkdir", "d0/d00"])
498 self.mount_a.run_shell(["mkdir", "d0/d01"])
499
500 self.mount_a.create_n_files('d0/d00/more_file', 20, sync=True)
501 self.mount_a.create_n_files('d0/d01/some_more_file', 75, sync=True)
502
503 # take another snapshot
504 self.mount_a.run_shell(["mkdir", "d0/.snap/snap1"])
505
506 time.sleep(60)
507 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
508 "client.mirror_remote@ceph", '/d0', 'snap1', 2)
509 self.verify_snapshot('d0', 'snap1')
510
511 # delete a snapshot
512 self.mount_a.run_shell(["rmdir", "d0/.snap/snap0"])
513
514 time.sleep(10)
515 snap_list = self.mount_b.ls(path='d0/.snap')
516 self.assertTrue('snap0' not in snap_list)
517 self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id,
518 "client.mirror_remote@ceph", '/d0', 1)
519
520 # rename a snapshot
521 self.mount_a.run_shell(["mv", "d0/.snap/snap1", "d0/.snap/snap2"])
522
523 time.sleep(10)
524 snap_list = self.mount_b.ls(path='d0/.snap')
525 self.assertTrue('snap1' not in snap_list)
526 self.assertTrue('snap2' in snap_list)
527 self.check_peer_status_renamed_snap(self.primary_fs_name, self.primary_fs_id,
528 "client.mirror_remote@ceph", '/d0', 1)
529
530 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
531 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
532
533 def test_cephfs_mirror_cancel_sync(self):
534 log.debug('reconfigure client auth caps')
535 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
536 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
537 'mds', 'allow rw',
538 'mon', 'allow r',
539 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
540 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
541
542 log.debug(f'mounting filesystem {self.secondary_fs_name}')
543 self.mount_b.umount_wait()
544 self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name)
545
546 # create a bunch of files in a directory to snap
547 self.mount_a.run_shell(["mkdir", "d0"])
548 for i in range(8):
549 filename = f'file.{i}'
550 self.mount_a.write_n_mb(os.path.join('d0', filename), 1024)
551
552 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
553 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
554 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
555
556 # take a snapshot
557 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
558
559 time.sleep(10)
560 self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
561 "client.mirror_remote@ceph", '/d0', 'snap0')
562
563 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
564
565 snap_list = self.mount_b.ls(path='d0/.snap')
566 self.assertTrue('snap0' not in snap_list)
567 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
568
569 def test_cephfs_mirror_restart_sync_on_blocklist(self):
570 log.debug('reconfigure client auth caps')
571 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
572 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
573 'mds', 'allow rw',
574 'mon', 'allow r',
575 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
576 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
577
578 log.debug(f'mounting filesystem {self.secondary_fs_name}')
579 self.mount_b.umount_wait()
580 self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name)
581
582 # create a bunch of files in a directory to snap
583 self.mount_a.run_shell(["mkdir", "d0"])
584 for i in range(8):
585 filename = f'file.{i}'
586 self.mount_a.write_n_mb(os.path.join('d0', filename), 1024)
587
588 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
589 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
590 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
591
592 # fetch rados address for blacklist check
593 rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id)
594
595 # take a snapshot
596 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
597
598 time.sleep(10)
599 self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
600 "client.mirror_remote@ceph", '/d0', 'snap0')
601
602 # simulate non-responding mirror daemon by sending SIGSTOP
603 pid = self.get_mirror_daemon_pid()
604 log.debug(f'SIGSTOP to cephfs-mirror pid {pid}')
605 self.mount_a.run_shell(['kill', '-SIGSTOP', pid])
606
607 # wait for blocklist timeout -- the manager module would blocklist
608 # the mirror daemon
609 time.sleep(40)
610
611 # wake up the mirror daemon -- at this point, the daemon should know
612 # that it has been blocklisted
613 log.debug('SIGCONT to cephfs-mirror')
614 self.mount_a.run_shell(['kill', '-SIGCONT', pid])
615
616 # check if the rados addr is blocklisted
617 self.assertTrue(self.mds_cluster.is_addr_blocklisted(rados_inst))
618
619 time.sleep(500)
620 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
621 "client.mirror_remote@ceph", '/d0', 'snap0', expected_snap_count=1)
622 self.verify_snapshot('d0', 'snap0')
623
624 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
625 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
626
627 def test_cephfs_mirror_failed_sync_with_correction(self):
628 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
629 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
630
631 # add a non-existent directory for synchronization
632 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
633
634 # wait for mirror daemon to mark it the directory as failed
635 time.sleep(120)
636 self.verify_failed_directory(self.primary_fs_name, self.primary_fs_id,
637 "client.mirror_remote@ceph", '/d0')
638
639 # create the directory
640 self.mount_a.run_shell(["mkdir", "d0"])
641 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
642
643 # wait for correction
644 time.sleep(120)
645 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
646 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
647 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
648
649 def test_cephfs_mirror_service_daemon_status(self):
650 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
651 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
652
653 time.sleep(30)
654 status = self.get_mirror_daemon_status()
655
656 # assumption for this test: mirroring enabled for a single filesystem w/ single
657 # peer
658
659 # we have not added any directories
660 peer = status['filesystems'][0]['peers'][0]
661 self.assertEquals(status['filesystems'][0]['directory_count'], 0)
662 self.assertEquals(peer['stats']['failure_count'], 0)
663 self.assertEquals(peer['stats']['recovery_count'], 0)
664
665 # add a non-existent directory for synchronization -- check if its reported
666 # in daemon stats
667 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
668
669 time.sleep(120)
670 status = self.get_mirror_daemon_status()
671 # we added one
672 peer = status['filesystems'][0]['peers'][0]
673 self.assertEquals(status['filesystems'][0]['directory_count'], 1)
674 # failure count should be reflected
675 self.assertEquals(peer['stats']['failure_count'], 1)
676 self.assertEquals(peer['stats']['recovery_count'], 0)
677
678 # create the directory, mirror daemon would recover
679 self.mount_a.run_shell(["mkdir", "d0"])
680
681 time.sleep(120)
682 status = self.get_mirror_daemon_status()
683 peer = status['filesystems'][0]['peers'][0]
684 self.assertEquals(status['filesystems'][0]['directory_count'], 1)
685 # failure and recovery count should be reflected
686 self.assertEquals(peer['stats']['failure_count'], 1)
687 self.assertEquals(peer['stats']['recovery_count'], 1)
688
689 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
690
691 def test_mirroring_init_failure(self):
692 """Test mirror daemon init failure"""
693
694 # disable mgr mirroring plugin as it would try to load dir map on
695 # on mirroring enabled for a filesystem (an throw up erorrs in
696 # the logs)
697 self.disable_mirroring_module()
698
699 # enable mirroring through mon interface -- this should result in the mirror daemon
700 # failing to enable mirroring due to absence of `cephfs_mirorr` index object.
701 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "enable", self.primary_fs_name)
702
703 with safe_while(sleep=5, tries=10, action='wait for failed state') as proceed:
704 while proceed():
705 try:
706 # verify via asok
707 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
708 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
709 if not 'state' in res:
710 return
711 self.assertTrue(res['state'] == "failed")
712 return True
713 except:
714 pass
715
716 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "disable", self.primary_fs_name)
717 time.sleep(10)
718 # verify via asok
719 try:
720 self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
721 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
722 except CommandFailedError:
723 pass
724 else:
725 raise RuntimeError('expected admin socket to be unavailable')
726
727 def test_mirroring_init_failure_with_recovery(self):
728 """Test if the mirror daemon can recover from a init failure"""
729
730 # disable mgr mirroring plugin as it would try to load dir map on
731 # on mirroring enabled for a filesystem (an throw up erorrs in
732 # the logs)
733 self.disable_mirroring_module()
734
735 # enable mirroring through mon interface -- this should result in the mirror daemon
736 # failing to enable mirroring due to absence of `cephfs_mirror` index object.
737
738 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "enable", self.primary_fs_name)
739 # need safe_while since non-failed status pops up as mirroring is restarted
740 # internally in mirror daemon.
741 with safe_while(sleep=5, tries=20, action='wait for failed state') as proceed:
742 while proceed():
743 try:
744 # verify via asok
745 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
746 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
747 if not 'state' in res:
748 return
749 self.assertTrue(res['state'] == "failed")
750 return True
751 except:
752 pass
753
754 # create the index object and check daemon recovery
755 try:
756 p = self.mount_a.client_remote.run(args=['rados', '-p', self.fs.metadata_pool_name, 'create', 'cephfs_mirror'],
757 stdout=StringIO(), stderr=StringIO(), timeout=30,
758 check_status=True, label="create index object")
759 p.wait()
760 except CommandFailedError as ce:
761 log.warn(f'mirror daemon command to create mirror index object failed: {ce}')
762 raise
763 time.sleep(30)
764 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
765 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
766 self.assertTrue(res['peers'] == {})
767 self.assertTrue(res['snap_dirs']['dir_count'] == 0)
768
769 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "disable", self.primary_fs_name)
770 time.sleep(10)
771 # verify via asok
772 try:
773 self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
774 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
775 except CommandFailedError:
776 pass
777 else:
778 raise RuntimeError('expected admin socket to be unavailable')
779
780 def test_cephfs_mirror_peer_bootstrap(self):
781 """Test importing peer bootstrap token"""
782 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
783
784 # create a bootstrap token for the peer
785 bootstrap_token = self.bootstrap_peer(self.secondary_fs_name, "client.mirror_peer_bootstrap", "site-remote")
786
787 # import the peer via bootstrap token
788 self.import_peer(self.primary_fs_name, bootstrap_token)
789 time.sleep(10)
790 self.verify_peer_added(self.primary_fs_name, self.primary_fs_id, "client.mirror_peer_bootstrap@site-remote",
791 self.secondary_fs_name)
792
793 # verify via peer_list interface
794 peer_uuid = self.get_peer_uuid("client.mirror_peer_bootstrap@site-remote")
795 res = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_list", self.primary_fs_name))
796 self.assertTrue(peer_uuid in res)
797 self.assertTrue('mon_host' in res[peer_uuid] and res[peer_uuid]['mon_host'] != '')
798
799 # remove peer
800 self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_peer_bootstrap@site-remote")
801 # disable mirroring
802 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
803
804 def test_cephfs_mirror_symlink_sync(self):
805 log.debug('reconfigure client auth caps')
806 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
807 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
808 'mds', 'allow rw',
809 'mon', 'allow r',
810 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
811 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
812
813 log.debug(f'mounting filesystem {self.secondary_fs_name}')
814 self.mount_b.umount_wait()
815 self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name)
816
817 # create a bunch of files w/ symbolic links in a directory to snap
818 self.mount_a.run_shell(["mkdir", "d0"])
819 self.mount_a.create_n_files('d0/file', 10, sync=True)
820 self.mount_a.run_shell(["ln", "-s", "./file_0", "d0/sym_0"])
821 self.mount_a.run_shell(["ln", "-s", "./file_1", "d0/sym_1"])
822 self.mount_a.run_shell(["ln", "-s", "./file_2", "d0/sym_2"])
823
824 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
825 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
826 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
827
828 # take a snapshot
829 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
830
831 time.sleep(30)
832 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
833 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
834 self.verify_snapshot('d0', 'snap0')
835
836 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
837 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
838
839 def test_cephfs_mirror_with_parent_snapshot(self):
840 """Test snapshot synchronization with parent directory snapshots"""
841 self.mount_a.run_shell(["mkdir", "-p", "d0/d1/d2/d3"])
842
843 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
844 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0/d1/d2/d3')
845 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
846
847 # take a snapshot
848 self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap0"])
849
850 time.sleep(30)
851 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
852 "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap0', 1)
853
854 # create snapshots in parent directories
855 self.mount_a.run_shell(["mkdir", "d0/.snap/snap_d0"])
856 self.mount_a.run_shell(["mkdir", "d0/d1/.snap/snap_d1"])
857 self.mount_a.run_shell(["mkdir", "d0/d1/d2/.snap/snap_d2"])
858
859 # try syncing more snapshots
860 self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap1"])
861 time.sleep(30)
862 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
863 "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap1', 2)
864
865 self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap0"])
866 self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap1"])
867 time.sleep(15)
868 self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id,
869 "client.mirror_remote@ceph", '/d0/d1/d2/d3', 2)
870
871 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0/d1/d2/d3')
872 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
873
874 def test_cephfs_mirror_remove_on_stall(self):
875 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
876
877 # fetch rados address for blacklist check
878 rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id)
879
880 # simulate non-responding mirror daemon by sending SIGSTOP
881 pid = self.get_mirror_daemon_pid()
882 log.debug(f'SIGSTOP to cephfs-mirror pid {pid}')
883 self.mount_a.run_shell(['kill', '-SIGSTOP', pid])
884
885 # wait for blocklist timeout -- the manager module would blocklist
886 # the mirror daemon
887 time.sleep(40)
888
889 # make sure the rados addr is blocklisted
890 self.assertTrue(self.mds_cluster.is_addr_blocklisted(rados_inst))
891
892 # now we are sure that there are no "active" mirror daemons -- add a directory path.
893 dir_path_p = "/d0/d1"
894 dir_path = "/d0/d1/d2"
895
896 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", self.primary_fs_name, dir_path)
897
898 time.sleep(10)
899 # this uses an undocumented interface to get dirpath map state
900 res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path)
901 res = json.loads(res_json)
902 # there are no mirror daemons
903 self.assertTrue(res['state'], 'stalled')
904
905 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "remove", self.primary_fs_name, dir_path)
906
907 time.sleep(10)
908 try:
909 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path)
910 except CommandFailedError as ce:
911 if ce.exitstatus != errno.ENOENT:
912 raise RuntimeError('invalid errno when checking dirmap status for non-existent directory')
913 else:
914 raise RuntimeError('incorrect errno when checking dirmap state for non-existent directory')
915
916 # adding a parent directory should be allowed
917 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", self.primary_fs_name, dir_path_p)
918
919 time.sleep(10)
920 # however, this directory path should get stalled too
921 res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path_p)
922 res = json.loads(res_json)
923 # there are no mirror daemons
924 self.assertTrue(res['state'], 'stalled')
925
926 # wake up the mirror daemon -- at this point, the daemon should know
927 # that it has been blocklisted
928 log.debug('SIGCONT to cephfs-mirror')
929 self.mount_a.run_shell(['kill', '-SIGCONT', pid])
930
931 # wait for restart mirror on blocklist
932 time.sleep(60)
933 res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path_p)
934 res = json.loads(res_json)
935 # there are no mirror daemons
936 self.assertTrue(res['state'], 'mapped')
937
938 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
939
940 def test_cephfs_mirror_incremental_sync(self):
941 """ Test incremental snapshot synchronization (based on mtime differences)."""
942 log.debug('reconfigure client auth caps')
943 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
944 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
945 'mds', 'allow rw',
946 'mon', 'allow r',
947 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
948 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
949 log.debug(f'mounting filesystem {self.secondary_fs_name}')
950 self.mount_b.umount_wait()
951 self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name)
952
953 repo = 'ceph-qa-suite'
954 repo_dir = 'ceph_repo'
955 repo_path = f'{repo_dir}/{repo}'
956
957 def clone_repo():
958 self.mount_a.run_shell([
959 'git', 'clone', '--branch', 'giant',
960 f'http://github.com/ceph/{repo}', repo_path])
961
962 def exec_git_cmd(cmd_list):
963 self.mount_a.run_shell(['git', '--git-dir', f'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list])
964
965 self.mount_a.run_shell(["mkdir", repo_dir])
966 clone_repo()
967
968 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
969 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
970
971 self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}')
972 self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a'])
973
974 # full copy, takes time
975 time.sleep(500)
976 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
977 "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1)
978 self.verify_snapshot(repo_path, 'snap_a')
979
980 # create some diff
981 num = random.randint(5, 20)
982 log.debug(f'resetting to HEAD~{num}')
983 exec_git_cmd(["reset", "--hard", f'HEAD~{num}'])
984
985 self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b'])
986 # incremental copy, should be fast
987 time.sleep(180)
988 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
989 "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2)
990 self.verify_snapshot(repo_path, 'snap_b')
991
992 # diff again, this time back to HEAD
993 log.debug('resetting to HEAD')
994 exec_git_cmd(["pull"])
995
996 self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_c'])
997 # incremental copy, should be fast
998 time.sleep(180)
999 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1000 "client.mirror_remote@ceph", f'/{repo_path}', 'snap_c', 3)
1001 self.verify_snapshot(repo_path, 'snap_c')
1002
1003 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
1004
1005 def test_cephfs_mirror_incremental_sync_with_type_mixup(self):
1006 """ Test incremental snapshot synchronization with file type changes.
1007
1008 The same filename exist as a different type in subsequent snapshot.
1009 This verifies if the mirror daemon can identify file type mismatch and
1010 sync snapshots.
1011
1012 \ snap_0 snap_1 snap_2 snap_3
1013 \-----------------------------------------------
1014 file_x | reg sym dir reg
1015 |
1016 file_y | dir reg sym dir
1017 |
1018 file_z | sym dir reg sym
1019 """
1020 log.debug('reconfigure client auth caps')
1021 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
1022 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
1023 'mds', 'allow rw',
1024 'mon', 'allow r',
1025 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
1026 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
1027 log.debug(f'mounting filesystem {self.secondary_fs_name}')
1028 self.mount_b.umount_wait()
1029 self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name)
1030
1031 typs = deque(['reg', 'dir', 'sym'])
1032 def cleanup_and_create_with_type(dirname, fnames):
1033 self.mount_a.run_shell_payload(f"rm -rf {dirname}/*")
1034 fidx = 0
1035 for t in typs:
1036 fname = f'{dirname}/{fnames[fidx]}'
1037 log.debug(f'file: {fname} type: {t}')
1038 if t == 'reg':
1039 self.mount_a.run_shell(["touch", fname])
1040 self.mount_a.write_file(fname, data=fname)
1041 elif t == 'dir':
1042 self.mount_a.run_shell(["mkdir", fname])
1043 elif t == 'sym':
1044 # verify ELOOP in mirror daemon
1045 self.mount_a.run_shell(["ln", "-s", "..", fname])
1046 fidx += 1
1047
1048 def verify_types(dirname, fnames, snap_name):
1049 tidx = 0
1050 for fname in fnames:
1051 t = self.mount_b.run_shell_payload(f"stat -c %F {dirname}/.snap/{snap_name}/{fname}").stdout.getvalue().strip()
1052 if typs[tidx] == 'reg':
1053 self.assertEquals('regular file', t)
1054 elif typs[tidx] == 'dir':
1055 self.assertEquals('directory', t)
1056 elif typs[tidx] == 'sym':
1057 self.assertEquals('symbolic link', t)
1058 tidx += 1
1059
1060 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
1061 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
1062
1063 self.mount_a.run_shell(["mkdir", "d0"])
1064 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
1065
1066 fnames = ['file_x', 'file_y', 'file_z']
1067 turns = 0
1068 while turns != len(typs):
1069 snapname = f'snap_{turns}'
1070 cleanup_and_create_with_type('d0', fnames)
1071 self.mount_a.run_shell(['mkdir', f'd0/.snap/{snapname}'])
1072 time.sleep(30)
1073 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1074 "client.mirror_remote@ceph", '/d0', snapname, turns+1)
1075 verify_types('d0', fnames, snapname)
1076 # next type
1077 typs.rotate(1)
1078 turns += 1
1079
1080 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
1081
1082 def test_cephfs_mirror_sync_with_purged_snapshot(self):
1083 """Test snapshot synchronization in midst of snapshot deletes.
1084
1085 Deleted the previous snapshot when the mirror daemon is figuring out
1086 incremental differences between current and previous snaphot. The
1087 mirror daemon should identify the purge and switch to using remote
1088 comparison to sync the snapshot (in the next iteration of course).
1089 """
1090
1091 log.debug('reconfigure client auth caps')
1092 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
1093 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
1094 'mds', 'allow rw',
1095 'mon', 'allow r',
1096 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
1097 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
1098 log.debug(f'mounting filesystem {self.secondary_fs_name}')
1099 self.mount_b.umount_wait()
1100 self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name)
1101
1102 repo = 'ceph-qa-suite'
1103 repo_dir = 'ceph_repo'
1104 repo_path = f'{repo_dir}/{repo}'
1105
1106 def clone_repo():
1107 self.mount_a.run_shell([
1108 'git', 'clone', '--branch', 'giant',
1109 f'http://github.com/ceph/{repo}', repo_path])
1110
1111 def exec_git_cmd(cmd_list):
1112 self.mount_a.run_shell(['git', '--git-dir', f'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list])
1113
1114 self.mount_a.run_shell(["mkdir", repo_dir])
1115 clone_repo()
1116
1117 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
1118 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
1119
1120 self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}')
1121 self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a'])
1122
1123 # full copy, takes time
1124 time.sleep(500)
1125 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1126 "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1)
1127 self.verify_snapshot(repo_path, 'snap_a')
1128
1129 # create some diff
1130 num = random.randint(60, 100)
1131 log.debug(f'resetting to HEAD~{num}')
1132 exec_git_cmd(["reset", "--hard", f'HEAD~{num}'])
1133
1134 self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b'])
1135
1136 time.sleep(15)
1137 self.mount_a.run_shell(['rmdir', f'{repo_path}/.snap/snap_a'])
1138
1139 # incremental copy but based on remote dir_root
1140 time.sleep(300)
1141 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1142 "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2)
1143 self.verify_snapshot(repo_path, 'snap_b')
1144
1145 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
1146
1147 def test_cephfs_mirror_peer_add_primary(self):
1148 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
1149 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
1150
1151 # try adding the primary file system as a peer to secondary file
1152 # system
1153 try:
1154 self.peer_add(self.secondary_fs_name, self.secondary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name)
1155 except CommandFailedError as ce:
1156 if ce.exitstatus != errno.EINVAL:
1157 raise RuntimeError('invalid errno when adding a primary file system')
1158 else:
1159 raise RuntimeError('adding peer should fail')
1160
1161 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
1162
1163 def test_cephfs_mirror_cancel_mirroring_and_readd(self):
1164 """
1165 Test adding a directory path for synchronization post removal of already added directory paths
1166
1167 ... to ensure that synchronization of the newly added directory path functions
1168 as expected. Note that we schedule three (3) directories for mirroring to ensure
1169 that all replayer threads (3 by default) in the mirror daemon are busy.
1170 """
1171 log.debug('reconfigure client auth caps')
1172 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
1173 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
1174 'mds', 'allow rw',
1175 'mon', 'allow r',
1176 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
1177 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
1178
1179 log.debug(f'mounting filesystem {self.secondary_fs_name}')
1180 self.mount_b.umount_wait()
1181 self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name)
1182
1183 # create a bunch of files in a directory to snap
1184 self.mount_a.run_shell(["mkdir", "d0"])
1185 self.mount_a.run_shell(["mkdir", "d1"])
1186 self.mount_a.run_shell(["mkdir", "d2"])
1187 for i in range(4):
1188 filename = f'file.{i}'
1189 self.mount_a.write_n_mb(os.path.join('d0', filename), 1024)
1190 self.mount_a.write_n_mb(os.path.join('d1', filename), 1024)
1191 self.mount_a.write_n_mb(os.path.join('d2', filename), 1024)
1192
1193 log.debug('enabling mirroring')
1194 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
1195 log.debug('adding directory paths')
1196 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
1197 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
1198 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d2')
1199 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
1200
1201 # take snapshots
1202 log.debug('taking snapshots')
1203 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
1204 self.mount_a.run_shell(["mkdir", "d1/.snap/snap0"])
1205 self.mount_a.run_shell(["mkdir", "d2/.snap/snap0"])
1206
1207 time.sleep(10)
1208 log.debug('checking snap in progress')
1209 self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
1210 "client.mirror_remote@ceph", '/d0', 'snap0')
1211 self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
1212 "client.mirror_remote@ceph", '/d1', 'snap0')
1213 self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
1214 "client.mirror_remote@ceph", '/d2', 'snap0')
1215
1216 log.debug('removing directories 1')
1217 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
1218 log.debug('removing directories 2')
1219 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
1220 log.debug('removing directories 3')
1221 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d2')
1222
1223 log.debug('removing snapshots')
1224 self.mount_a.run_shell(["rmdir", "d0/.snap/snap0"])
1225 self.mount_a.run_shell(["rmdir", "d1/.snap/snap0"])
1226 self.mount_a.run_shell(["rmdir", "d2/.snap/snap0"])
1227
1228 for i in range(4):
1229 filename = f'file.{i}'
1230 log.debug(f'deleting {filename}')
1231 self.mount_a.run_shell(["rm", "-f", os.path.join('d0', filename)])
1232 self.mount_a.run_shell(["rm", "-f", os.path.join('d1', filename)])
1233 self.mount_a.run_shell(["rm", "-f", os.path.join('d2', filename)])
1234
1235 log.debug('creating new files...')
1236 self.mount_a.create_n_files('d0/file', 50, sync=True)
1237 self.mount_a.create_n_files('d1/file', 50, sync=True)
1238 self.mount_a.create_n_files('d2/file', 50, sync=True)
1239
1240 log.debug('adding directory paths')
1241 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
1242 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
1243 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d2')
1244
1245 log.debug('creating new snapshots...')
1246 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
1247 self.mount_a.run_shell(["mkdir", "d1/.snap/snap0"])
1248 self.mount_a.run_shell(["mkdir", "d2/.snap/snap0"])
1249
1250 time.sleep(60)
1251 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1252 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
1253 self.verify_snapshot('d0', 'snap0')
1254
1255 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1256 "client.mirror_remote@ceph", '/d1', 'snap0', 1)
1257 self.verify_snapshot('d1', 'snap0')
1258
1259 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1260 "client.mirror_remote@ceph", '/d2', 'snap0', 1)
1261 self.verify_snapshot('d2', 'snap0')
1262
1263 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)