]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/test_mirroring.py
import ceph pacific 16.2.5
[ceph.git] / ceph / qa / tasks / cephfs / test_mirroring.py
CommitLineData
f67539c2
TL
1import os
2import json
3import errno
4import logging
b3b6e05e 5import random
f67539c2
TL
6import time
7
8from io import StringIO
b3b6e05e 9from collections import deque
f67539c2
TL
10
11from tasks.cephfs.cephfs_test_case import CephFSTestCase
12from teuthology.exceptions import CommandFailedError
13from teuthology.contextutil import safe_while
14
15log = logging.getLogger(__name__)
16
17class TestMirroring(CephFSTestCase):
18 MDSS_REQUIRED = 5
19 CLIENTS_REQUIRED = 2
20 REQUIRE_BACKUP_FILESYSTEM = True
21
22 MODULE_NAME = "mirroring"
23
24 def setUp(self):
25 super(TestMirroring, self).setUp()
26 self.primary_fs_name = self.fs.name
27 self.primary_fs_id = self.fs.id
28 self.secondary_fs_name = self.backup_fs.name
b3b6e05e 29 self.secondary_fs_id = self.backup_fs.id
f67539c2
TL
30 self.enable_mirroring_module()
31
32 def tearDown(self):
33 self.disable_mirroring_module()
34 super(TestMirroring, self).tearDown()
35
36 def enable_mirroring_module(self):
37 self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable", TestMirroring.MODULE_NAME)
38
39 def disable_mirroring_module(self):
40 self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "disable", TestMirroring.MODULE_NAME)
41
42 def enable_mirroring(self, fs_name, fs_id):
43 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "enable", fs_name)
44 time.sleep(10)
45 # verify via asok
46 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
47 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
48 self.assertTrue(res['peers'] == {})
49 self.assertTrue(res['snap_dirs']['dir_count'] == 0)
50
51 def disable_mirroring(self, fs_name, fs_id):
52 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "disable", fs_name)
53 time.sleep(10)
54 # verify via asok
55 try:
56 self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
57 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
58 except CommandFailedError:
59 pass
60 else:
61 raise RuntimeError('expected admin socket to be unavailable')
62
63 def verify_peer_added(self, fs_name, fs_id, peer_spec, remote_fs_name=None):
64 # verify via asok
65 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
66 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
67 peer_uuid = self.get_peer_uuid(peer_spec)
68 self.assertTrue(peer_uuid in res['peers'])
69 client_name = res['peers'][peer_uuid]['remote']['client_name']
70 cluster_name = res['peers'][peer_uuid]['remote']['cluster_name']
71 self.assertTrue(peer_spec == f'{client_name}@{cluster_name}')
72 if remote_fs_name:
73 self.assertTrue(self.secondary_fs_name == res['peers'][peer_uuid]['remote']['fs_name'])
74 else:
75 self.assertTrue(self.fs_name == res['peers'][peer_uuid]['remote']['fs_name'])
76
77 def peer_add(self, fs_name, fs_id, peer_spec, remote_fs_name=None):
78 if remote_fs_name:
79 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_add", fs_name, peer_spec, remote_fs_name)
80 else:
81 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_add", fs_name, peer_spec)
82 time.sleep(10)
83 self.verify_peer_added(fs_name, fs_id, peer_spec, remote_fs_name)
84
85 def peer_remove(self, fs_name, fs_id, peer_spec):
86 peer_uuid = self.get_peer_uuid(peer_spec)
87 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_remove", fs_name, peer_uuid)
88 time.sleep(10)
89 # verify via asok
90 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
91 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
92 self.assertTrue(res['peers'] == {} and res['snap_dirs']['dir_count'] == 0)
93
94 def bootstrap_peer(self, fs_name, client_name, site_name):
95 outj = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd(
96 "fs", "snapshot", "mirror", "peer_bootstrap", "create", fs_name, client_name, site_name))
97 return outj['token']
98
99 def import_peer(self, fs_name, token):
100 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_bootstrap", "import",
101 fs_name, token)
102
103 def add_directory(self, fs_name, fs_id, dir_name):
104 # get initial dir count
105 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
106 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
107 dir_count = res['snap_dirs']['dir_count']
108 log.debug(f'initial dir_count={dir_count}')
109
110 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", fs_name, dir_name)
111
112 time.sleep(10)
113 # verify via asok
114 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
115 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
116 new_dir_count = res['snap_dirs']['dir_count']
117 log.debug(f'new dir_count={new_dir_count}')
118 self.assertTrue(new_dir_count > dir_count)
119
120 def remove_directory(self, fs_name, fs_id, dir_name):
121 # get initial dir count
122 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
123 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
124 dir_count = res['snap_dirs']['dir_count']
125 log.debug(f'initial dir_count={dir_count}')
126
127 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "remove", fs_name, dir_name)
128
129 time.sleep(10)
130 # verify via asok
131 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
132 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
133 new_dir_count = res['snap_dirs']['dir_count']
134 log.debug(f'new dir_count={new_dir_count}')
135 self.assertTrue(new_dir_count < dir_count)
136
137 def check_peer_status(self, fs_name, fs_id, peer_spec, dir_name, expected_snap_name,
138 expected_snap_count):
139 peer_uuid = self.get_peer_uuid(peer_spec)
140 res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
141 'fs', 'mirror', 'peer', 'status',
142 f'{fs_name}@{fs_id}', peer_uuid)
143 self.assertTrue(dir_name in res)
144 self.assertTrue(res[dir_name]['last_synced_snap']['name'] == expected_snap_name)
145 self.assertTrue(res[dir_name]['snaps_synced'] == expected_snap_count)
146
147 def check_peer_status_deleted_snap(self, fs_name, fs_id, peer_spec, dir_name,
148 expected_delete_count):
149 peer_uuid = self.get_peer_uuid(peer_spec)
150 res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
151 'fs', 'mirror', 'peer', 'status',
152 f'{fs_name}@{fs_id}', peer_uuid)
153 self.assertTrue(dir_name in res)
154 self.assertTrue(res[dir_name]['snaps_deleted'] == expected_delete_count)
155
156 def check_peer_status_renamed_snap(self, fs_name, fs_id, peer_spec, dir_name,
157 expected_rename_count):
158 peer_uuid = self.get_peer_uuid(peer_spec)
159 res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
160 'fs', 'mirror', 'peer', 'status',
161 f'{fs_name}@{fs_id}', peer_uuid)
162 self.assertTrue(dir_name in res)
163 self.assertTrue(res[dir_name]['snaps_renamed'] == expected_rename_count)
164
165 def check_peer_snap_in_progress(self, fs_name, fs_id,
166 peer_spec, dir_name, snap_name):
167 peer_uuid = self.get_peer_uuid(peer_spec)
168 res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
169 'fs', 'mirror', 'peer', 'status',
170 f'{fs_name}@{fs_id}', peer_uuid)
171 self.assertTrue('syncing' == res[dir_name]['state'])
172 self.assertTrue(res[dir_name]['current_sycning_snap']['name'] == snap_name)
173
174 def verify_snapshot(self, dir_name, snap_name):
175 snap_list = self.mount_b.ls(path=f'{dir_name}/.snap')
176 self.assertTrue(snap_name in snap_list)
177
178 source_res = self.mount_a.dir_checksum(path=f'{dir_name}/.snap/{snap_name}',
179 follow_symlinks=True)
180 log.debug(f'source snapshot checksum {snap_name} {source_res}')
181
182 dest_res = self.mount_b.dir_checksum(path=f'{dir_name}/.snap/{snap_name}',
183 follow_symlinks=True)
184 log.debug(f'destination snapshot checksum {snap_name} {dest_res}')
185 self.assertTrue(source_res == dest_res)
186
187 def verify_failed_directory(self, fs_name, fs_id, peer_spec, dir_name):
188 peer_uuid = self.get_peer_uuid(peer_spec)
189 res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
190 'fs', 'mirror', 'peer', 'status',
191 f'{fs_name}@{fs_id}', peer_uuid)
192 self.assertTrue('failed' == res[dir_name]['state'])
193
194 def get_peer_uuid(self, peer_spec):
195 status = self.fs.status()
196 fs_map = status.get_fsmap_byname(self.primary_fs_name)
197 peers = fs_map['mirror_info']['peers']
198 for peer_uuid, mirror_info in peers.items():
199 client_name = mirror_info['remote']['client_name']
200 cluster_name = mirror_info['remote']['cluster_name']
201 remote_peer_spec = f'{client_name}@{cluster_name}'
202 if peer_spec == remote_peer_spec:
203 return peer_uuid
204 return None
205
206 def get_daemon_admin_socket(self):
207 """overloaded by teuthology override (fs/mirror/clients/mirror.yaml)"""
208 return "/var/run/ceph/cephfs-mirror.asok"
209
210 def get_mirror_daemon_pid(self):
211 """pid file overloaded in fs/mirror/clients/mirror.yaml"""
212 return self.mount_a.run_shell(['cat', '/var/run/ceph/cephfs-mirror.pid']).stdout.getvalue().strip()
213
214 def get_mirror_rados_addr(self, fs_name, fs_id):
215 """return the rados addr used by cephfs-mirror instance"""
216 res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
217 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
218 return res['rados_inst']
219
220 def get_blocklisted_instances(self):
221 return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd(
222 "osd", "dump", "--format=json-pretty"))['blocklist']
223
224 def mirror_daemon_command(self, cmd_label, *args):
225 asok_path = self.get_daemon_admin_socket()
226 try:
227 # use mount_a's remote to execute command
228 p = self.mount_a.client_remote.run(args=
229 ['ceph', '--admin-daemon', asok_path] + list(args),
230 stdout=StringIO(), stderr=StringIO(), timeout=30,
231 check_status=True, label=cmd_label)
232 p.wait()
233 except CommandFailedError as ce:
234 log.warn(f'mirror daemon command with label "{cmd_label}" failed: {ce}')
235 raise
236 res = p.stdout.getvalue().strip()
237 log.debug(f'command returned={res}')
238 return json.loads(res)
239
b3b6e05e 240 def get_mirror_daemon_status(self, fs_name, fs_id):
f67539c2
TL
241 daemon_status = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "daemon", "status", fs_name))
242 log.debug(f'daemon_status: {daemon_status}')
b3b6e05e
TL
243 # running a single mirror daemon is supported
244 status = daemon_status[0]
f67539c2
TL
245 log.debug(f'status: {status}')
246 return status
247
248 def test_basic_mirror_commands(self):
249 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
250 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
251
252 def test_mirror_peer_commands(self):
253 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
254
255 # add peer
256 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
257 # remove peer
258 self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph")
259
260 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
261
262 def test_mirror_disable_with_peer(self):
263 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
264
265 # add peer
266 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
267
268 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
269
270 def test_matching_peer(self):
271 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
272
273 try:
274 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph")
275 except CommandFailedError as ce:
276 if ce.exitstatus != errno.EINVAL:
277 raise RuntimeError('invalid errno when adding a matching remote peer')
278 else:
279 raise RuntimeError('adding a peer matching local spec should fail')
280
281 # verify via asok -- nothing should get added
282 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
283 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
284 self.assertTrue(res['peers'] == {})
285
286 # and explicitly specifying the spec (via filesystem name) should fail too
287 try:
288 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name)
289 except CommandFailedError as ce:
290 if ce.exitstatus != errno.EINVAL:
291 raise RuntimeError('invalid errno when adding a matching remote peer')
292 else:
293 raise RuntimeError('adding a peer matching local spec should fail')
294
295 # verify via asok -- nothing should get added
296 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
297 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
298 self.assertTrue(res['peers'] == {})
299
300 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
301
302 def test_mirror_peer_add_existing(self):
303 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
304
305 # add peer
306 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
307
308 # adding the same peer should be idempotent
309 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
310
311 # remove peer
312 self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph")
313
314 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
315
316 def test_peer_commands_with_mirroring_disabled(self):
317 # try adding peer when mirroring is not enabled
318 try:
319 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
320 except CommandFailedError as ce:
321 if ce.exitstatus != errno.EINVAL:
322 raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a peer')
323 else:
324 raise RuntimeError(-errno.EINVAL, 'expected peer_add to fail')
325
326 # try removing peer
327 try:
328 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_remove", self.primary_fs_name, 'dummy-uuid')
329 except CommandFailedError as ce:
330 if ce.exitstatus != errno.EINVAL:
331 raise RuntimeError(-errno.EINVAL, 'incorrect error code when removing a peer')
332 else:
333 raise RuntimeError(-errno.EINVAL, 'expected peer_remove to fail')
334
335 def test_add_directory_with_mirroring_disabled(self):
336 # try adding a directory when mirroring is not enabled
337 try:
338 self.add_directory(self.primary_fs_name, self.primary_fs_id, "/d1")
339 except CommandFailedError as ce:
340 if ce.exitstatus != errno.EINVAL:
341 raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a directory')
342 else:
343 raise RuntimeError(-errno.EINVAL, 'expected directory add to fail')
344
345 def test_directory_commands(self):
346 self.mount_a.run_shell(["mkdir", "d1"])
347 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
348 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
349 try:
350 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
351 except CommandFailedError as ce:
352 if ce.exitstatus != errno.EEXIST:
353 raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-adding a directory')
354 else:
355 raise RuntimeError(-errno.EINVAL, 'expected directory add to fail')
356 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
357 try:
358 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
359 except CommandFailedError as ce:
360 if ce.exitstatus not in (errno.ENOENT, errno.EINVAL):
361 raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-deleting a directory')
362 else:
363 raise RuntimeError(-errno.EINVAL, 'expected directory removal to fail')
364 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
365 self.mount_a.run_shell(["rmdir", "d1"])
366
367 def test_add_relative_directory_path(self):
368 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
369 try:
370 self.add_directory(self.primary_fs_name, self.primary_fs_id, './d1')
371 except CommandFailedError as ce:
372 if ce.exitstatus != errno.EINVAL:
373 raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a relative path dir')
374 else:
375 raise RuntimeError(-errno.EINVAL, 'expected directory add to fail')
376 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
377
378 def test_add_directory_path_normalization(self):
379 self.mount_a.run_shell(["mkdir", "-p", "d1/d2/d3"])
380 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
381 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d2/d3')
382 def check_add_command_failure(dir_path):
383 try:
384 self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path)
385 except CommandFailedError as ce:
386 if ce.exitstatus != errno.EEXIST:
387 raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-adding a directory')
388 else:
389 raise RuntimeError(-errno.EINVAL, 'expected directory add to fail')
390
391 # everything points for /d1/d2/d3
392 check_add_command_failure('/d1/d2/././././././d3')
393 check_add_command_failure('/d1/d2/././././././d3//////')
394 check_add_command_failure('/d1/d2/../d2/././././d3')
395 check_add_command_failure('/././././d1/./././d2/./././d3//////')
396 check_add_command_failure('/./d1/./d2/./d3/../../../d1/d2/d3')
397
398 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
399 self.mount_a.run_shell(["rm", "-rf", "d1"])
400
401 def test_add_ancestor_and_child_directory(self):
402 self.mount_a.run_shell(["mkdir", "-p", "d1/d2/d3"])
403 self.mount_a.run_shell(["mkdir", "-p", "d1/d4"])
404 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
405 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d2/')
406 def check_add_command_failure(dir_path):
407 try:
408 self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path)
409 except CommandFailedError as ce:
410 if ce.exitstatus != errno.EINVAL:
411 raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a directory')
412 else:
413 raise RuntimeError(-errno.EINVAL, 'expected directory add to fail')
414
415 # cannot add ancestors or a subtree for an existing directory
416 check_add_command_failure('/')
417 check_add_command_failure('/d1')
418 check_add_command_failure('/d1/d2/d3')
419
420 # obviously, one can add a non-ancestor or non-subtree
421 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d4/')
422
423 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
424 self.mount_a.run_shell(["rm", "-rf", "d1"])
425
426 def test_cephfs_mirror_blocklist(self):
427 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
428
429 # add peer
430 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
431
432 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
433 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
434 peers_1 = set(res['peers'])
435
436 # fetch rados address for blacklist check
437 rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id)
438
439 # simulate non-responding mirror daemon by sending SIGSTOP
440 pid = self.get_mirror_daemon_pid()
441 log.debug(f'SIGSTOP to cephfs-mirror pid {pid}')
442 self.mount_a.run_shell(['kill', '-SIGSTOP', pid])
443
444 # wait for blocklist timeout -- the manager module would blocklist
445 # the mirror daemon
446 time.sleep(40)
447
448 # wake up the mirror daemon -- at this point, the daemon should know
449 # that it has been blocklisted
450 log.debug('SIGCONT to cephfs-mirror')
451 self.mount_a.run_shell(['kill', '-SIGCONT', pid])
452
453 # check if the rados addr is blocklisted
454 blocklist = self.get_blocklisted_instances()
455 self.assertTrue(rados_inst in blocklist)
456
457 # wait enough so that the mirror daemon restarts blocklisted instances
458 time.sleep(40)
459 rados_inst_new = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id)
460
461 # and we should get a new rados instance
462 self.assertTrue(rados_inst != rados_inst_new)
463
464 # along with peers that were added
465 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
466 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
467 peers_2 = set(res['peers'])
468 self.assertTrue(peers_1, peers_2)
469
470 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
471
472 def test_cephfs_mirror_stats(self):
473 log.debug('reconfigure client auth caps')
474 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
475 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
476 'mds', 'allow rw',
477 'mon', 'allow r',
478 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
479 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
480
481 log.debug(f'mounting filesystem {self.secondary_fs_name}')
482 self.mount_b.umount_wait()
483 self.mount_b.mount(cephfs_name=self.secondary_fs_name)
484
485 # create a bunch of files in a directory to snap
486 self.mount_a.run_shell(["mkdir", "d0"])
487 self.mount_a.create_n_files('d0/file', 50, sync=True)
488
489 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
490 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
491 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
492
493 # take a snapshot
494 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
495
496 time.sleep(30)
497 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
498 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
499 self.verify_snapshot('d0', 'snap0')
500
501 # some more IO
502 self.mount_a.run_shell(["mkdir", "d0/d00"])
503 self.mount_a.run_shell(["mkdir", "d0/d01"])
504
505 self.mount_a.create_n_files('d0/d00/more_file', 20, sync=True)
506 self.mount_a.create_n_files('d0/d01/some_more_file', 75, sync=True)
507
508 # take another snapshot
509 self.mount_a.run_shell(["mkdir", "d0/.snap/snap1"])
510
511 time.sleep(60)
512 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
513 "client.mirror_remote@ceph", '/d0', 'snap1', 2)
514 self.verify_snapshot('d0', 'snap1')
515
516 # delete a snapshot
517 self.mount_a.run_shell(["rmdir", "d0/.snap/snap0"])
518
519 time.sleep(10)
520 snap_list = self.mount_b.ls(path='d0/.snap')
521 self.assertTrue('snap0' not in snap_list)
522 self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id,
523 "client.mirror_remote@ceph", '/d0', 1)
524
525 # rename a snapshot
526 self.mount_a.run_shell(["mv", "d0/.snap/snap1", "d0/.snap/snap2"])
527
528 time.sleep(10)
529 snap_list = self.mount_b.ls(path='d0/.snap')
530 self.assertTrue('snap1' not in snap_list)
531 self.assertTrue('snap2' in snap_list)
532 self.check_peer_status_renamed_snap(self.primary_fs_name, self.primary_fs_id,
533 "client.mirror_remote@ceph", '/d0', 1)
534
535 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
536 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
537
538 def test_cephfs_mirror_cancel_sync(self):
539 log.debug('reconfigure client auth caps')
540 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
541 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
542 'mds', 'allow rw',
543 'mon', 'allow r',
544 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
545 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
546
547 log.debug(f'mounting filesystem {self.secondary_fs_name}')
548 self.mount_b.umount_wait()
549 self.mount_b.mount(cephfs_name=self.secondary_fs_name)
550
551 # create a bunch of files in a directory to snap
552 self.mount_a.run_shell(["mkdir", "d0"])
553 for i in range(8):
554 filename = f'file.{i}'
555 self.mount_a.write_n_mb(os.path.join('d0', filename), 1024)
556
557 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
558 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
559 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
560
561 # take a snapshot
562 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
563
564 time.sleep(10)
565 self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
566 "client.mirror_remote@ceph", '/d0', 'snap0')
567
568 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
569
570 snap_list = self.mount_b.ls(path='d0/.snap')
571 self.assertTrue('snap0' not in snap_list)
572 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
573
574 def test_cephfs_mirror_restart_sync_on_blocklist(self):
575 log.debug('reconfigure client auth caps')
576 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
577 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
578 'mds', 'allow rw',
579 'mon', 'allow r',
580 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
581 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
582
583 log.debug(f'mounting filesystem {self.secondary_fs_name}')
584 self.mount_b.umount_wait()
585 self.mount_b.mount(cephfs_name=self.secondary_fs_name)
586
587 # create a bunch of files in a directory to snap
588 self.mount_a.run_shell(["mkdir", "d0"])
589 for i in range(8):
590 filename = f'file.{i}'
591 self.mount_a.write_n_mb(os.path.join('d0', filename), 1024)
592
593 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
594 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
595 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
596
597 # fetch rados address for blacklist check
598 rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id)
599
600 # take a snapshot
601 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
602
603 time.sleep(10)
604 self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
605 "client.mirror_remote@ceph", '/d0', 'snap0')
606
607 # simulate non-responding mirror daemon by sending SIGSTOP
608 pid = self.get_mirror_daemon_pid()
609 log.debug(f'SIGSTOP to cephfs-mirror pid {pid}')
610 self.mount_a.run_shell(['kill', '-SIGSTOP', pid])
611
612 # wait for blocklist timeout -- the manager module would blocklist
613 # the mirror daemon
614 time.sleep(40)
615
616 # wake up the mirror daemon -- at this point, the daemon should know
617 # that it has been blocklisted
618 log.debug('SIGCONT to cephfs-mirror')
619 self.mount_a.run_shell(['kill', '-SIGCONT', pid])
620
621 # check if the rados addr is blocklisted
622 blocklist = self.get_blocklisted_instances()
623 self.assertTrue(rados_inst in blocklist)
624
625 time.sleep(500)
626 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
627 "client.mirror_remote@ceph", '/d0', 'snap0', expected_snap_count=1)
628 self.verify_snapshot('d0', 'snap0')
629
630 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
631 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
632
633 def test_cephfs_mirror_failed_sync_with_correction(self):
634 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
635 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
636
637 # add a non-existent directory for synchronization
638 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
639
640 # wait for mirror daemon to mark it the directory as failed
641 time.sleep(120)
642 self.verify_failed_directory(self.primary_fs_name, self.primary_fs_id,
643 "client.mirror_remote@ceph", '/d0')
644
645 # create the directory
646 self.mount_a.run_shell(["mkdir", "d0"])
647 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
648
649 # wait for correction
650 time.sleep(120)
651 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
652 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
653 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
654
655 def test_cephfs_mirror_service_daemon_status(self):
656 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
657 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
f67539c2
TL
658
659 time.sleep(30)
b3b6e05e 660 status = self.get_mirror_daemon_status(self.primary_fs_name, self.primary_fs_id)
f67539c2 661
b3b6e05e
TL
662 # assumption for this test: mirroring enabled for a single filesystem w/ single
663 # peer
f67539c2 664
b3b6e05e
TL
665 # we have not added any directories
666 peer = status['filesystems'][0]['peers'][0]
667 self.assertEquals(status['filesystems'][0]['directory_count'], 0)
668 self.assertEquals(peer['stats']['failure_count'], 0)
669 self.assertEquals(peer['stats']['recovery_count'], 0)
f67539c2
TL
670
671 # add a non-existent directory for synchronization -- check if its reported
672 # in daemon stats
673 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
674
675 time.sleep(120)
b3b6e05e 676 status = self.get_mirror_daemon_status(self.primary_fs_name, self.primary_fs_id)
f67539c2 677 # we added one
b3b6e05e
TL
678 peer = status['filesystems'][0]['peers'][0]
679 self.assertEquals(status['filesystems'][0]['directory_count'], 1)
f67539c2 680 # failure count should be reflected
b3b6e05e
TL
681 self.assertEquals(peer['stats']['failure_count'], 1)
682 self.assertEquals(peer['stats']['recovery_count'], 0)
f67539c2
TL
683
684 # create the directory, mirror daemon would recover
685 self.mount_a.run_shell(["mkdir", "d0"])
686
687 time.sleep(120)
b3b6e05e
TL
688 status = self.get_mirror_daemon_status(self.primary_fs_name, self.primary_fs_id)
689 peer = status['filesystems'][0]['peers'][0]
690 self.assertEquals(status['filesystems'][0]['directory_count'], 1)
f67539c2 691 # failure and recovery count should be reflected
b3b6e05e
TL
692 self.assertEquals(peer['stats']['failure_count'], 1)
693 self.assertEquals(peer['stats']['recovery_count'], 1)
f67539c2
TL
694
695 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
696
697 def test_mirroring_init_failure(self):
698 """Test mirror daemon init failure"""
699
b3b6e05e
TL
700 # disable mgr mirroring plugin as it would try to load dir map on
701 # on mirroring enabled for a filesystem (an throw up erorrs in
702 # the logs)
703 self.disable_mirroring_module()
704
f67539c2
TL
705 # enable mirroring through mon interface -- this should result in the mirror daemon
706 # failing to enable mirroring due to absence of `cephfs_mirorr` index object.
707 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "enable", self.primary_fs_name)
708
709 with safe_while(sleep=5, tries=10, action='wait for failed state') as proceed:
710 while proceed():
711 try:
712 # verify via asok
713 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
714 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
715 if not 'state' in res:
716 return
717 self.assertTrue(res['state'] == "failed")
718 return True
719 except:
720 pass
721
722 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "disable", self.primary_fs_name)
723 time.sleep(10)
724 # verify via asok
725 try:
726 self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
727 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
728 except CommandFailedError:
729 pass
730 else:
731 raise RuntimeError('expected admin socket to be unavailable')
732
733 def test_mirroring_init_failure_with_recovery(self):
734 """Test if the mirror daemon can recover from a init failure"""
735
736 # disable mgr mirroring plugin as it would try to load dir map on
737 # on mirroring enabled for a filesystem (an throw up erorrs in
738 # the logs)
739 self.disable_mirroring_module()
740
741 # enable mirroring through mon interface -- this should result in the mirror daemon
742 # failing to enable mirroring due to absence of `cephfs_mirror` index object.
743
744 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "enable", self.primary_fs_name)
745 # need safe_while since non-failed status pops up as mirroring is restarted
746 # internally in mirror daemon.
747 with safe_while(sleep=5, tries=20, action='wait for failed state') as proceed:
748 while proceed():
749 try:
750 # verify via asok
751 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
752 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
753 if not 'state' in res:
754 return
755 self.assertTrue(res['state'] == "failed")
756 return True
757 except:
758 pass
759
760 # create the index object and check daemon recovery
761 try:
762 p = self.mount_a.client_remote.run(args=['rados', '-p', self.fs.metadata_pool_name, 'create', 'cephfs_mirror'],
763 stdout=StringIO(), stderr=StringIO(), timeout=30,
764 check_status=True, label="create index object")
765 p.wait()
766 except CommandFailedError as ce:
767 log.warn(f'mirror daemon command to create mirror index object failed: {ce}')
768 raise
769 time.sleep(30)
770 res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
771 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
772 self.assertTrue(res['peers'] == {})
773 self.assertTrue(res['snap_dirs']['dir_count'] == 0)
774
775 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "disable", self.primary_fs_name)
776 time.sleep(10)
777 # verify via asok
778 try:
779 self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
780 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
781 except CommandFailedError:
782 pass
783 else:
784 raise RuntimeError('expected admin socket to be unavailable')
785
786 def test_cephfs_mirror_peer_bootstrap(self):
787 """Test importing peer bootstrap token"""
788 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
789
790 # create a bootstrap token for the peer
791 bootstrap_token = self.bootstrap_peer(self.secondary_fs_name, "client.mirror_peer_bootstrap", "site-remote")
792
793 # import the peer via bootstrap token
794 self.import_peer(self.primary_fs_name, bootstrap_token)
795 time.sleep(10)
796 self.verify_peer_added(self.primary_fs_name, self.primary_fs_id, "client.mirror_peer_bootstrap@site-remote",
797 self.secondary_fs_name)
798
799 # verify via peer_list interface
800 peer_uuid = self.get_peer_uuid("client.mirror_peer_bootstrap@site-remote")
801 res = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_list", self.primary_fs_name))
802 self.assertTrue(peer_uuid in res)
803 self.assertTrue('mon_host' in res[peer_uuid] and res[peer_uuid]['mon_host'] != '')
804
805 # remove peer
806 self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_peer_bootstrap@site-remote")
807 # disable mirroring
808 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
809
810 def test_cephfs_mirror_symlink_sync(self):
811 log.debug('reconfigure client auth caps')
812 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
813 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
814 'mds', 'allow rw',
815 'mon', 'allow r',
816 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
817 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
818
819 log.debug(f'mounting filesystem {self.secondary_fs_name}')
820 self.mount_b.umount_wait()
821 self.mount_b.mount(cephfs_name=self.secondary_fs_name)
822
823 # create a bunch of files w/ symbolic links in a directory to snap
824 self.mount_a.run_shell(["mkdir", "d0"])
825 self.mount_a.create_n_files('d0/file', 10, sync=True)
826 self.mount_a.run_shell(["ln", "-s", "./file_0", "d0/sym_0"])
827 self.mount_a.run_shell(["ln", "-s", "./file_1", "d0/sym_1"])
828 self.mount_a.run_shell(["ln", "-s", "./file_2", "d0/sym_2"])
829
830 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
831 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
832 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
833
834 # take a snapshot
835 self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
836
837 time.sleep(30)
838 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
839 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
840 self.verify_snapshot('d0', 'snap0')
841
842 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
843 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
b3b6e05e
TL
844
845 def test_cephfs_mirror_with_parent_snapshot(self):
846 """Test snapshot synchronization with parent directory snapshots"""
847 self.mount_a.run_shell(["mkdir", "-p", "d0/d1/d2/d3"])
848
849 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
850 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0/d1/d2/d3')
851 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
852
853 # take a snapshot
854 self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap0"])
855
856 time.sleep(30)
857 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
858 "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap0', 1)
859
860 # create snapshots in parent directories
861 self.mount_a.run_shell(["mkdir", "d0/.snap/snap_d0"])
862 self.mount_a.run_shell(["mkdir", "d0/d1/.snap/snap_d1"])
863 self.mount_a.run_shell(["mkdir", "d0/d1/d2/.snap/snap_d2"])
864
865 # try syncing more snapshots
866 self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap1"])
867 time.sleep(30)
868 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
869 "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap1', 2)
870
871 self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap0"])
872 self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap1"])
873 time.sleep(15)
874 self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id,
875 "client.mirror_remote@ceph", '/d0/d1/d2/d3', 2)
876
877 self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0/d1/d2/d3')
878 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
879
880 def test_cephfs_mirror_remove_on_stall(self):
881 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
882
883 # fetch rados address for blacklist check
884 rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id)
885
886 # simulate non-responding mirror daemon by sending SIGSTOP
887 pid = self.get_mirror_daemon_pid()
888 log.debug(f'SIGSTOP to cephfs-mirror pid {pid}')
889 self.mount_a.run_shell(['kill', '-SIGSTOP', pid])
890
891 # wait for blocklist timeout -- the manager module would blocklist
892 # the mirror daemon
893 time.sleep(40)
894
895 # make sure the rados addr is blocklisted
896 blocklist = self.get_blocklisted_instances()
897 self.assertTrue(rados_inst in blocklist)
898
899 # now we are sure that there are no "active" mirror daemons -- add a directory path.
900 dir_path_p = "/d0/d1"
901 dir_path = "/d0/d1/d2"
902
903 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", self.primary_fs_name, dir_path)
904
905 time.sleep(10)
906 # this uses an undocumented interface to get dirpath map state
907 res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path)
908 res = json.loads(res_json)
909 # there are no mirror daemons
910 self.assertTrue(res['state'], 'stalled')
911
912 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "remove", self.primary_fs_name, dir_path)
913
914 time.sleep(10)
915 try:
916 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path)
917 except CommandFailedError as ce:
918 if ce.exitstatus != errno.ENOENT:
919 raise RuntimeError('invalid errno when checking dirmap status for non-existent directory')
920 else:
921 raise RuntimeError('incorrect errno when checking dirmap state for non-existent directory')
922
923 # adding a parent directory should be allowed
924 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", self.primary_fs_name, dir_path_p)
925
926 time.sleep(10)
927 # however, this directory path should get stalled too
928 res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path_p)
929 res = json.loads(res_json)
930 # there are no mirror daemons
931 self.assertTrue(res['state'], 'stalled')
932
933 # wake up the mirror daemon -- at this point, the daemon should know
934 # that it has been blocklisted
935 log.debug('SIGCONT to cephfs-mirror')
936 self.mount_a.run_shell(['kill', '-SIGCONT', pid])
937
938 # wait for restart mirror on blocklist
939 time.sleep(60)
940 res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path_p)
941 res = json.loads(res_json)
942 # there are no mirror daemons
943 self.assertTrue(res['state'], 'mapped')
944
945 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
946
947 def test_cephfs_mirror_incremental_sync(self):
948 """ Test incremental snapshot synchronization (based on mtime differences)."""
949 log.debug('reconfigure client auth caps')
950 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
951 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
952 'mds', 'allow rw',
953 'mon', 'allow r',
954 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
955 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
956 log.debug(f'mounting filesystem {self.secondary_fs_name}')
957 self.mount_b.umount_wait()
958 self.mount_b.mount(cephfs_name=self.secondary_fs_name)
959
960 repo = 'ceph-qa-suite'
961 repo_dir = 'ceph_repo'
962 repo_path = f'{repo_dir}/{repo}'
963
964 def clone_repo():
965 self.mount_a.run_shell([
966 'git', 'clone', '--branch', 'giant',
967 f'http://github.com/ceph/{repo}', repo_path])
968
969 def exec_git_cmd(cmd_list):
970 self.mount_a.run_shell(['git', '--git-dir', f'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list])
971
972 self.mount_a.run_shell(["mkdir", repo_dir])
973 clone_repo()
974
975 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
976 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
977
978 self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}')
979 self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a'])
980
981 # full copy, takes time
982 time.sleep(500)
983 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
984 "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1)
985 self.verify_snapshot(repo_path, 'snap_a')
986
987 # create some diff
988 num = random.randint(5, 20)
989 log.debug(f'resetting to HEAD~{num}')
990 exec_git_cmd(["reset", "--hard", f'HEAD~{num}'])
991
992 self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b'])
993 # incremental copy, should be fast
994 time.sleep(180)
995 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
996 "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2)
997 self.verify_snapshot(repo_path, 'snap_b')
998
999 # diff again, this time back to HEAD
1000 log.debug('resetting to HEAD')
1001 exec_git_cmd(["pull"])
1002
1003 self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_c'])
1004 # incremental copy, should be fast
1005 time.sleep(180)
1006 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1007 "client.mirror_remote@ceph", f'/{repo_path}', 'snap_c', 3)
1008 self.verify_snapshot(repo_path, 'snap_c')
1009
1010 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
1011
1012 def test_cephfs_mirror_incremental_sync_with_type_mixup(self):
1013 """ Test incremental snapshot synchronization with file type changes.
1014
1015 The same filename exist as a different type in subsequent snapshot.
1016 This verifies if the mirror daemon can identify file type mismatch and
1017 sync snapshots.
1018
1019 \ snap_0 snap_1 snap_2 snap_3
1020 \-----------------------------------------------
1021 file_x | reg sym dir reg
1022 |
1023 file_y | dir reg sym dir
1024 |
1025 file_z | sym dir reg sym
1026 """
1027 log.debug('reconfigure client auth caps')
1028 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
1029 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
1030 'mds', 'allow rw',
1031 'mon', 'allow r',
1032 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
1033 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
1034 log.debug(f'mounting filesystem {self.secondary_fs_name}')
1035 self.mount_b.umount_wait()
1036 self.mount_b.mount(cephfs_name=self.secondary_fs_name)
1037
1038 typs = deque(['reg', 'dir', 'sym'])
1039 def cleanup_and_create_with_type(dirname, fnames):
1040 self.mount_a.run_shell_payload(f"rm -rf {dirname}/*")
1041 fidx = 0
1042 for t in typs:
1043 fname = f'{dirname}/{fnames[fidx]}'
1044 log.debug(f'file: {fname} type: {t}')
1045 if t == 'reg':
1046 self.mount_a.run_shell(["touch", fname])
1047 self.mount_a.write_file(fname, data=fname)
1048 elif t == 'dir':
1049 self.mount_a.run_shell(["mkdir", fname])
1050 elif t == 'sym':
1051 # verify ELOOP in mirror daemon
1052 self.mount_a.run_shell(["ln", "-s", "..", fname])
1053 fidx += 1
1054
1055 def verify_types(dirname, fnames, snap_name):
1056 tidx = 0
1057 for fname in fnames:
1058 t = self.mount_b.run_shell_payload(f"stat -c %F {dirname}/.snap/{snap_name}/{fname}").stdout.getvalue().strip()
1059 if typs[tidx] == 'reg':
1060 self.assertEquals('regular file', t)
1061 elif typs[tidx] == 'dir':
1062 self.assertEquals('directory', t)
1063 elif typs[tidx] == 'sym':
1064 self.assertEquals('symbolic link', t)
1065 tidx += 1
1066
1067 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
1068 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
1069
1070 self.mount_a.run_shell(["mkdir", "d0"])
1071 self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
1072
1073 fnames = ['file_x', 'file_y', 'file_z']
1074 turns = 0
1075 while turns != len(typs):
1076 snapname = f'snap_{turns}'
1077 cleanup_and_create_with_type('d0', fnames)
1078 self.mount_a.run_shell(['mkdir', f'd0/.snap/{snapname}'])
1079 time.sleep(30)
1080 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1081 "client.mirror_remote@ceph", '/d0', snapname, turns+1)
1082 verify_types('d0', fnames, snapname)
1083 # next type
1084 typs.rotate(1)
1085 turns += 1
1086
1087 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
1088
1089 def test_cephfs_mirror_sync_with_purged_snapshot(self):
1090 """Test snapshot synchronization in midst of snapshot deletes.
1091
1092 Deleted the previous snapshot when the mirror daemon is figuring out
1093 incremental differences between current and previous snaphot. The
1094 mirror daemon should identify the purge and switch to using remote
1095 comparison to sync the snapshot (in the next iteration of course).
1096 """
1097
1098 log.debug('reconfigure client auth caps')
1099 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
1100 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
1101 'mds', 'allow rw',
1102 'mon', 'allow r',
1103 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
1104 self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
1105 log.debug(f'mounting filesystem {self.secondary_fs_name}')
1106 self.mount_b.umount_wait()
1107 self.mount_b.mount(cephfs_name=self.secondary_fs_name)
1108
1109 repo = 'ceph-qa-suite'
1110 repo_dir = 'ceph_repo'
1111 repo_path = f'{repo_dir}/{repo}'
1112
1113 def clone_repo():
1114 self.mount_a.run_shell([
1115 'git', 'clone', '--branch', 'giant',
1116 f'http://github.com/ceph/{repo}', repo_path])
1117
1118 def exec_git_cmd(cmd_list):
1119 self.mount_a.run_shell(['git', '--git-dir', f'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list])
1120
1121 self.mount_a.run_shell(["mkdir", repo_dir])
1122 clone_repo()
1123
1124 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
1125 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
1126
1127 self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}')
1128 self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a'])
1129
1130 # full copy, takes time
1131 time.sleep(500)
1132 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1133 "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1)
1134 self.verify_snapshot(repo_path, 'snap_a')
1135
1136 # create some diff
1137 num = random.randint(60, 100)
1138 log.debug(f'resetting to HEAD~{num}')
1139 exec_git_cmd(["reset", "--hard", f'HEAD~{num}'])
1140
1141 self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b'])
1142
1143 time.sleep(15)
1144 self.mount_a.run_shell(['rmdir', f'{repo_path}/.snap/snap_a'])
1145
1146 # incremental copy but based on remote dir_root
1147 time.sleep(300)
1148 self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
1149 "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2)
1150 self.verify_snapshot(repo_path, 'snap_b')
1151
1152 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
1153
1154 def test_cephfs_mirror_peer_add_primary(self):
1155 self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
1156 self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
1157
1158 # try adding the primary file system as a peer to secondary file
1159 # system
1160 try:
1161 self.peer_add(self.secondary_fs_name, self.secondary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name)
1162 except CommandFailedError as ce:
1163 if ce.exitstatus != errno.EINVAL:
1164 raise RuntimeError('invalid errno when adding a primary file system')
1165 else:
1166 raise RuntimeError('adding peer should fail')
1167
1168 self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)