8 from io
import StringIO
9 from collections
import deque
11 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
12 from teuthology
.exceptions
import CommandFailedError
13 from teuthology
.contextutil
import safe_while
15 log
= logging
.getLogger(__name__
)
17 class TestMirroring(CephFSTestCase
):
20 REQUIRE_BACKUP_FILESYSTEM
= True
22 MODULE_NAME
= "mirroring"
25 super(TestMirroring
, self
).setUp()
26 self
.primary_fs_name
= self
.fs
.name
27 self
.primary_fs_id
= self
.fs
.id
28 self
.secondary_fs_name
= self
.backup_fs
.name
29 self
.secondary_fs_id
= self
.backup_fs
.id
30 self
.enable_mirroring_module()
33 self
.disable_mirroring_module()
34 super(TestMirroring
, self
).tearDown()
36 def enable_mirroring_module(self
):
37 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("mgr", "module", "enable", TestMirroring
.MODULE_NAME
)
39 def disable_mirroring_module(self
):
40 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("mgr", "module", "disable", TestMirroring
.MODULE_NAME
)
42 def enable_mirroring(self
, fs_name
, fs_id
):
43 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "enable", fs_name
)
46 res
= self
.mirror_daemon_command(f
'mirror status for fs: {fs_name}',
47 'fs', 'mirror', 'status', f
'{fs_name}@{fs_id}')
48 self
.assertTrue(res
['peers'] == {})
49 self
.assertTrue(res
['snap_dirs']['dir_count'] == 0)
51 def disable_mirroring(self
, fs_name
, fs_id
):
52 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "disable", fs_name
)
56 self
.mirror_daemon_command(f
'mirror status for fs: {fs_name}',
57 'fs', 'mirror', 'status', f
'{fs_name}@{fs_id}')
58 except CommandFailedError
:
61 raise RuntimeError('expected admin socket to be unavailable')
63 def verify_peer_added(self
, fs_name
, fs_id
, peer_spec
, remote_fs_name
=None):
65 res
= self
.mirror_daemon_command(f
'mirror status for fs: {fs_name}',
66 'fs', 'mirror', 'status', f
'{fs_name}@{fs_id}')
67 peer_uuid
= self
.get_peer_uuid(peer_spec
)
68 self
.assertTrue(peer_uuid
in res
['peers'])
69 client_name
= res
['peers'][peer_uuid
]['remote']['client_name']
70 cluster_name
= res
['peers'][peer_uuid
]['remote']['cluster_name']
71 self
.assertTrue(peer_spec
== f
'{client_name}@{cluster_name}')
73 self
.assertTrue(self
.secondary_fs_name
== res
['peers'][peer_uuid
]['remote']['fs_name'])
75 self
.assertTrue(self
.fs_name
== res
['peers'][peer_uuid
]['remote']['fs_name'])
77 def peer_add(self
, fs_name
, fs_id
, peer_spec
, remote_fs_name
=None):
79 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_add", fs_name
, peer_spec
, remote_fs_name
)
81 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_add", fs_name
, peer_spec
)
83 self
.verify_peer_added(fs_name
, fs_id
, peer_spec
, remote_fs_name
)
85 def peer_remove(self
, fs_name
, fs_id
, peer_spec
):
86 peer_uuid
= self
.get_peer_uuid(peer_spec
)
87 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_remove", fs_name
, peer_uuid
)
90 res
= self
.mirror_daemon_command(f
'mirror status for fs: {fs_name}',
91 'fs', 'mirror', 'status', f
'{fs_name}@{fs_id}')
92 self
.assertTrue(res
['peers'] == {} and res
['snap_dirs']['dir_count'] == 0)
94 def bootstrap_peer(self
, fs_name
, client_name
, site_name
):
95 outj
= json
.loads(self
.mgr_cluster
.mon_manager
.raw_cluster_cmd(
96 "fs", "snapshot", "mirror", "peer_bootstrap", "create", fs_name
, client_name
, site_name
))
99 def import_peer(self
, fs_name
, token
):
100 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_bootstrap", "import",
103 def add_directory(self
, fs_name
, fs_id
, dir_name
):
104 # get initial dir count
105 res
= self
.mirror_daemon_command(f
'mirror status for fs: {fs_name}',
106 'fs', 'mirror', 'status', f
'{fs_name}@{fs_id}')
107 dir_count
= res
['snap_dirs']['dir_count']
108 log
.debug(f
'initial dir_count={dir_count}')
110 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "add", fs_name
, dir_name
)
114 res
= self
.mirror_daemon_command(f
'mirror status for fs: {fs_name}',
115 'fs', 'mirror', 'status', f
'{fs_name}@{fs_id}')
116 new_dir_count
= res
['snap_dirs']['dir_count']
117 log
.debug(f
'new dir_count={new_dir_count}')
118 self
.assertTrue(new_dir_count
> dir_count
)
120 def remove_directory(self
, fs_name
, fs_id
, dir_name
):
121 # get initial dir count
122 res
= self
.mirror_daemon_command(f
'mirror status for fs: {fs_name}',
123 'fs', 'mirror', 'status', f
'{fs_name}@{fs_id}')
124 dir_count
= res
['snap_dirs']['dir_count']
125 log
.debug(f
'initial dir_count={dir_count}')
127 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "remove", fs_name
, dir_name
)
131 res
= self
.mirror_daemon_command(f
'mirror status for fs: {fs_name}',
132 'fs', 'mirror', 'status', f
'{fs_name}@{fs_id}')
133 new_dir_count
= res
['snap_dirs']['dir_count']
134 log
.debug(f
'new dir_count={new_dir_count}')
135 self
.assertTrue(new_dir_count
< dir_count
)
137 def check_peer_status(self
, fs_name
, fs_id
, peer_spec
, dir_name
, expected_snap_name
,
138 expected_snap_count
):
139 peer_uuid
= self
.get_peer_uuid(peer_spec
)
140 res
= self
.mirror_daemon_command(f
'peer status for fs: {fs_name}',
141 'fs', 'mirror', 'peer', 'status',
142 f
'{fs_name}@{fs_id}', peer_uuid
)
143 self
.assertTrue(dir_name
in res
)
144 self
.assertTrue(res
[dir_name
]['last_synced_snap']['name'] == expected_snap_name
)
145 self
.assertTrue(res
[dir_name
]['snaps_synced'] == expected_snap_count
)
147 def check_peer_status_deleted_snap(self
, fs_name
, fs_id
, peer_spec
, dir_name
,
148 expected_delete_count
):
149 peer_uuid
= self
.get_peer_uuid(peer_spec
)
150 res
= self
.mirror_daemon_command(f
'peer status for fs: {fs_name}',
151 'fs', 'mirror', 'peer', 'status',
152 f
'{fs_name}@{fs_id}', peer_uuid
)
153 self
.assertTrue(dir_name
in res
)
154 self
.assertTrue(res
[dir_name
]['snaps_deleted'] == expected_delete_count
)
156 def check_peer_status_renamed_snap(self
, fs_name
, fs_id
, peer_spec
, dir_name
,
157 expected_rename_count
):
158 peer_uuid
= self
.get_peer_uuid(peer_spec
)
159 res
= self
.mirror_daemon_command(f
'peer status for fs: {fs_name}',
160 'fs', 'mirror', 'peer', 'status',
161 f
'{fs_name}@{fs_id}', peer_uuid
)
162 self
.assertTrue(dir_name
in res
)
163 self
.assertTrue(res
[dir_name
]['snaps_renamed'] == expected_rename_count
)
165 def check_peer_snap_in_progress(self
, fs_name
, fs_id
,
166 peer_spec
, dir_name
, snap_name
):
167 peer_uuid
= self
.get_peer_uuid(peer_spec
)
168 res
= self
.mirror_daemon_command(f
'peer status for fs: {fs_name}',
169 'fs', 'mirror', 'peer', 'status',
170 f
'{fs_name}@{fs_id}', peer_uuid
)
171 self
.assertTrue('syncing' == res
[dir_name
]['state'])
172 self
.assertTrue(res
[dir_name
]['current_sycning_snap']['name'] == snap_name
)
174 def verify_snapshot(self
, dir_name
, snap_name
):
175 snap_list
= self
.mount_b
.ls(path
=f
'{dir_name}/.snap')
176 self
.assertTrue(snap_name
in snap_list
)
178 source_res
= self
.mount_a
.dir_checksum(path
=f
'{dir_name}/.snap/{snap_name}',
179 follow_symlinks
=True)
180 log
.debug(f
'source snapshot checksum {snap_name} {source_res}')
182 dest_res
= self
.mount_b
.dir_checksum(path
=f
'{dir_name}/.snap/{snap_name}',
183 follow_symlinks
=True)
184 log
.debug(f
'destination snapshot checksum {snap_name} {dest_res}')
185 self
.assertTrue(source_res
== dest_res
)
187 def verify_failed_directory(self
, fs_name
, fs_id
, peer_spec
, dir_name
):
188 peer_uuid
= self
.get_peer_uuid(peer_spec
)
189 res
= self
.mirror_daemon_command(f
'peer status for fs: {fs_name}',
190 'fs', 'mirror', 'peer', 'status',
191 f
'{fs_name}@{fs_id}', peer_uuid
)
192 self
.assertTrue('failed' == res
[dir_name
]['state'])
194 def get_peer_uuid(self
, peer_spec
):
195 status
= self
.fs
.status()
196 fs_map
= status
.get_fsmap_byname(self
.primary_fs_name
)
197 peers
= fs_map
['mirror_info']['peers']
198 for peer_uuid
, mirror_info
in peers
.items():
199 client_name
= mirror_info
['remote']['client_name']
200 cluster_name
= mirror_info
['remote']['cluster_name']
201 remote_peer_spec
= f
'{client_name}@{cluster_name}'
202 if peer_spec
== remote_peer_spec
:
206 def get_daemon_admin_socket(self
):
207 """overloaded by teuthology override (fs/mirror/clients/mirror.yaml)"""
208 return "/var/run/ceph/cephfs-mirror.asok"
210 def get_mirror_daemon_pid(self
):
211 """pid file overloaded in fs/mirror/clients/mirror.yaml"""
212 return self
.mount_a
.run_shell(['cat', '/var/run/ceph/cephfs-mirror.pid']).stdout
.getvalue().strip()
214 def get_mirror_rados_addr(self
, fs_name
, fs_id
):
215 """return the rados addr used by cephfs-mirror instance"""
216 res
= self
.mirror_daemon_command(f
'mirror status for fs: {fs_name}',
217 'fs', 'mirror', 'status', f
'{fs_name}@{fs_id}')
218 return res
['rados_inst']
220 def mirror_daemon_command(self
, cmd_label
, *args
):
221 asok_path
= self
.get_daemon_admin_socket()
223 # use mount_a's remote to execute command
224 p
= self
.mount_a
.client_remote
.run(args
=
225 ['ceph', '--admin-daemon', asok_path
] + list(args
),
226 stdout
=StringIO(), stderr
=StringIO(), timeout
=30,
227 check_status
=True, label
=cmd_label
)
229 except CommandFailedError
as ce
:
230 log
.warn(f
'mirror daemon command with label "{cmd_label}" failed: {ce}')
232 res
= p
.stdout
.getvalue().strip()
233 log
.debug(f
'command returned={res}')
234 return json
.loads(res
)
236 def get_mirror_daemon_status(self
):
237 daemon_status
= json
.loads(self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "daemon", "status"))
238 log
.debug(f
'daemon_status: {daemon_status}')
239 # running a single mirror daemon is supported
240 status
= daemon_status
[0]
241 log
.debug(f
'status: {status}')
244 def test_basic_mirror_commands(self
):
245 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
246 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
248 def test_mirror_peer_commands(self
):
249 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
252 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
254 self
.peer_remove(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph")
256 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
258 def test_mirror_disable_with_peer(self
):
259 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
262 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
264 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
266 def test_matching_peer(self
):
267 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
270 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph")
271 except CommandFailedError
as ce
:
272 if ce
.exitstatus
!= errno
.EINVAL
:
273 raise RuntimeError('invalid errno when adding a matching remote peer')
275 raise RuntimeError('adding a peer matching local spec should fail')
277 # verify via asok -- nothing should get added
278 res
= self
.mirror_daemon_command(f
'mirror status for fs: {self.primary_fs_name}',
279 'fs', 'mirror', 'status', f
'{self.primary_fs_name}@{self.primary_fs_id}')
280 self
.assertTrue(res
['peers'] == {})
282 # and explicitly specifying the spec (via filesystem name) should fail too
284 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.primary_fs_name
)
285 except CommandFailedError
as ce
:
286 if ce
.exitstatus
!= errno
.EINVAL
:
287 raise RuntimeError('invalid errno when adding a matching remote peer')
289 raise RuntimeError('adding a peer matching local spec should fail')
291 # verify via asok -- nothing should get added
292 res
= self
.mirror_daemon_command(f
'mirror status for fs: {self.primary_fs_name}',
293 'fs', 'mirror', 'status', f
'{self.primary_fs_name}@{self.primary_fs_id}')
294 self
.assertTrue(res
['peers'] == {})
296 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
298 def test_mirror_peer_add_existing(self
):
299 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
302 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
304 # adding the same peer should be idempotent
305 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
308 self
.peer_remove(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph")
310 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
312 def test_peer_commands_with_mirroring_disabled(self
):
313 # try adding peer when mirroring is not enabled
315 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
316 except CommandFailedError
as ce
:
317 if ce
.exitstatus
!= errno
.EINVAL
:
318 raise RuntimeError(-errno
.EINVAL
, 'incorrect error code when adding a peer')
320 raise RuntimeError(-errno
.EINVAL
, 'expected peer_add to fail')
324 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_remove", self
.primary_fs_name
, 'dummy-uuid')
325 except CommandFailedError
as ce
:
326 if ce
.exitstatus
!= errno
.EINVAL
:
327 raise RuntimeError(-errno
.EINVAL
, 'incorrect error code when removing a peer')
329 raise RuntimeError(-errno
.EINVAL
, 'expected peer_remove to fail')
331 def test_add_directory_with_mirroring_disabled(self
):
332 # try adding a directory when mirroring is not enabled
334 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, "/d1")
335 except CommandFailedError
as ce
:
336 if ce
.exitstatus
!= errno
.EINVAL
:
337 raise RuntimeError(-errno
.EINVAL
, 'incorrect error code when adding a directory')
339 raise RuntimeError(-errno
.EINVAL
, 'expected directory add to fail')
341 def test_directory_commands(self
):
342 self
.mount_a
.run_shell(["mkdir", "d1"])
343 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
344 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d1')
346 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d1')
347 except CommandFailedError
as ce
:
348 if ce
.exitstatus
!= errno
.EEXIST
:
349 raise RuntimeError(-errno
.EINVAL
, 'incorrect error code when re-adding a directory')
351 raise RuntimeError(-errno
.EINVAL
, 'expected directory add to fail')
352 self
.remove_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d1')
354 self
.remove_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d1')
355 except CommandFailedError
as ce
:
356 if ce
.exitstatus
not in (errno
.ENOENT
, errno
.EINVAL
):
357 raise RuntimeError(-errno
.EINVAL
, 'incorrect error code when re-deleting a directory')
359 raise RuntimeError(-errno
.EINVAL
, 'expected directory removal to fail')
360 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
361 self
.mount_a
.run_shell(["rmdir", "d1"])
363 def test_add_relative_directory_path(self
):
364 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
366 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, './d1')
367 except CommandFailedError
as ce
:
368 if ce
.exitstatus
!= errno
.EINVAL
:
369 raise RuntimeError(-errno
.EINVAL
, 'incorrect error code when adding a relative path dir')
371 raise RuntimeError(-errno
.EINVAL
, 'expected directory add to fail')
372 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
374 def test_add_directory_path_normalization(self
):
375 self
.mount_a
.run_shell(["mkdir", "-p", "d1/d2/d3"])
376 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
377 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d1/d2/d3')
378 def check_add_command_failure(dir_path
):
380 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, dir_path
)
381 except CommandFailedError
as ce
:
382 if ce
.exitstatus
!= errno
.EEXIST
:
383 raise RuntimeError(-errno
.EINVAL
, 'incorrect error code when re-adding a directory')
385 raise RuntimeError(-errno
.EINVAL
, 'expected directory add to fail')
387 # everything points for /d1/d2/d3
388 check_add_command_failure('/d1/d2/././././././d3')
389 check_add_command_failure('/d1/d2/././././././d3//////')
390 check_add_command_failure('/d1/d2/../d2/././././d3')
391 check_add_command_failure('/././././d1/./././d2/./././d3//////')
392 check_add_command_failure('/./d1/./d2/./d3/../../../d1/d2/d3')
394 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
395 self
.mount_a
.run_shell(["rm", "-rf", "d1"])
397 def test_add_ancestor_and_child_directory(self
):
398 self
.mount_a
.run_shell(["mkdir", "-p", "d1/d2/d3"])
399 self
.mount_a
.run_shell(["mkdir", "-p", "d1/d4"])
400 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
401 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d1/d2/')
402 def check_add_command_failure(dir_path
):
404 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, dir_path
)
405 except CommandFailedError
as ce
:
406 if ce
.exitstatus
!= errno
.EINVAL
:
407 raise RuntimeError(-errno
.EINVAL
, 'incorrect error code when adding a directory')
409 raise RuntimeError(-errno
.EINVAL
, 'expected directory add to fail')
411 # cannot add ancestors or a subtree for an existing directory
412 check_add_command_failure('/')
413 check_add_command_failure('/d1')
414 check_add_command_failure('/d1/d2/d3')
416 # obviously, one can add a non-ancestor or non-subtree
417 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d1/d4/')
419 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
420 self
.mount_a
.run_shell(["rm", "-rf", "d1"])
422 def test_cephfs_mirror_blocklist(self
):
423 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
426 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
428 res
= self
.mirror_daemon_command(f
'mirror status for fs: {self.primary_fs_name}',
429 'fs', 'mirror', 'status', f
'{self.primary_fs_name}@{self.primary_fs_id}')
430 peers_1
= set(res
['peers'])
432 # fetch rados address for blacklist check
433 rados_inst
= self
.get_mirror_rados_addr(self
.primary_fs_name
, self
.primary_fs_id
)
435 # simulate non-responding mirror daemon by sending SIGSTOP
436 pid
= self
.get_mirror_daemon_pid()
437 log
.debug(f
'SIGSTOP to cephfs-mirror pid {pid}')
438 self
.mount_a
.run_shell(['kill', '-SIGSTOP', pid
])
440 # wait for blocklist timeout -- the manager module would blocklist
444 # wake up the mirror daemon -- at this point, the daemon should know
445 # that it has been blocklisted
446 log
.debug('SIGCONT to cephfs-mirror')
447 self
.mount_a
.run_shell(['kill', '-SIGCONT', pid
])
449 # check if the rados addr is blocklisted
450 self
.assertTrue(self
.mds_cluster
.is_addr_blocklisted(rados_inst
))
452 # wait enough so that the mirror daemon restarts blocklisted instances
454 rados_inst_new
= self
.get_mirror_rados_addr(self
.primary_fs_name
, self
.primary_fs_id
)
456 # and we should get a new rados instance
457 self
.assertTrue(rados_inst
!= rados_inst_new
)
459 # along with peers that were added
460 res
= self
.mirror_daemon_command(f
'mirror status for fs: {self.primary_fs_name}',
461 'fs', 'mirror', 'status', f
'{self.primary_fs_name}@{self.primary_fs_id}')
462 peers_2
= set(res
['peers'])
463 self
.assertTrue(peers_1
, peers_2
)
465 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
467 def test_cephfs_mirror_stats(self
):
468 log
.debug('reconfigure client auth caps')
469 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
470 'auth', 'caps', "client.{0}".format(self
.mount_b
.client_id
),
473 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
474 self
.backup_fs
.get_data_pool_name(), self
.backup_fs
.get_data_pool_name()))
476 log
.debug(f
'mounting filesystem {self.secondary_fs_name}')
477 self
.mount_b
.umount_wait()
478 self
.mount_b
.mount_wait(cephfs_name
=self
.secondary_fs_name
)
480 # create a bunch of files in a directory to snap
481 self
.mount_a
.run_shell(["mkdir", "d0"])
482 self
.mount_a
.create_n_files('d0/file', 50, sync
=True)
484 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
485 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
486 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
489 self
.mount_a
.run_shell(["mkdir", "d0/.snap/snap0"])
492 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
493 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
494 self
.verify_snapshot('d0', 'snap0')
497 self
.mount_a
.run_shell(["mkdir", "d0/d00"])
498 self
.mount_a
.run_shell(["mkdir", "d0/d01"])
500 self
.mount_a
.create_n_files('d0/d00/more_file', 20, sync
=True)
501 self
.mount_a
.create_n_files('d0/d01/some_more_file', 75, sync
=True)
503 # take another snapshot
504 self
.mount_a
.run_shell(["mkdir", "d0/.snap/snap1"])
507 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
508 "client.mirror_remote@ceph", '/d0', 'snap1', 2)
509 self
.verify_snapshot('d0', 'snap1')
512 self
.mount_a
.run_shell(["rmdir", "d0/.snap/snap0"])
515 snap_list
= self
.mount_b
.ls(path
='d0/.snap')
516 self
.assertTrue('snap0' not in snap_list
)
517 self
.check_peer_status_deleted_snap(self
.primary_fs_name
, self
.primary_fs_id
,
518 "client.mirror_remote@ceph", '/d0', 1)
521 self
.mount_a
.run_shell(["mv", "d0/.snap/snap1", "d0/.snap/snap2"])
524 snap_list
= self
.mount_b
.ls(path
='d0/.snap')
525 self
.assertTrue('snap1' not in snap_list
)
526 self
.assertTrue('snap2' in snap_list
)
527 self
.check_peer_status_renamed_snap(self
.primary_fs_name
, self
.primary_fs_id
,
528 "client.mirror_remote@ceph", '/d0', 1)
530 self
.remove_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
531 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
533 def test_cephfs_mirror_cancel_sync(self
):
534 log
.debug('reconfigure client auth caps')
535 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
536 'auth', 'caps', "client.{0}".format(self
.mount_b
.client_id
),
539 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
540 self
.backup_fs
.get_data_pool_name(), self
.backup_fs
.get_data_pool_name()))
542 log
.debug(f
'mounting filesystem {self.secondary_fs_name}')
543 self
.mount_b
.umount_wait()
544 self
.mount_b
.mount_wait(cephfs_name
=self
.secondary_fs_name
)
546 # create a bunch of files in a directory to snap
547 self
.mount_a
.run_shell(["mkdir", "d0"])
549 filename
= f
'file.{i}'
550 self
.mount_a
.write_n_mb(os
.path
.join('d0', filename
), 1024)
552 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
553 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
554 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
557 self
.mount_a
.run_shell(["mkdir", "d0/.snap/snap0"])
560 self
.check_peer_snap_in_progress(self
.primary_fs_name
, self
.primary_fs_id
,
561 "client.mirror_remote@ceph", '/d0', 'snap0')
563 self
.remove_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
565 snap_list
= self
.mount_b
.ls(path
='d0/.snap')
566 self
.assertTrue('snap0' not in snap_list
)
567 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
569 def test_cephfs_mirror_restart_sync_on_blocklist(self
):
570 log
.debug('reconfigure client auth caps')
571 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
572 'auth', 'caps', "client.{0}".format(self
.mount_b
.client_id
),
575 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
576 self
.backup_fs
.get_data_pool_name(), self
.backup_fs
.get_data_pool_name()))
578 log
.debug(f
'mounting filesystem {self.secondary_fs_name}')
579 self
.mount_b
.umount_wait()
580 self
.mount_b
.mount_wait(cephfs_name
=self
.secondary_fs_name
)
582 # create a bunch of files in a directory to snap
583 self
.mount_a
.run_shell(["mkdir", "d0"])
585 filename
= f
'file.{i}'
586 self
.mount_a
.write_n_mb(os
.path
.join('d0', filename
), 1024)
588 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
589 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
590 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
592 # fetch rados address for blacklist check
593 rados_inst
= self
.get_mirror_rados_addr(self
.primary_fs_name
, self
.primary_fs_id
)
596 self
.mount_a
.run_shell(["mkdir", "d0/.snap/snap0"])
599 self
.check_peer_snap_in_progress(self
.primary_fs_name
, self
.primary_fs_id
,
600 "client.mirror_remote@ceph", '/d0', 'snap0')
602 # simulate non-responding mirror daemon by sending SIGSTOP
603 pid
= self
.get_mirror_daemon_pid()
604 log
.debug(f
'SIGSTOP to cephfs-mirror pid {pid}')
605 self
.mount_a
.run_shell(['kill', '-SIGSTOP', pid
])
607 # wait for blocklist timeout -- the manager module would blocklist
611 # wake up the mirror daemon -- at this point, the daemon should know
612 # that it has been blocklisted
613 log
.debug('SIGCONT to cephfs-mirror')
614 self
.mount_a
.run_shell(['kill', '-SIGCONT', pid
])
616 # check if the rados addr is blocklisted
617 self
.assertTrue(self
.mds_cluster
.is_addr_blocklisted(rados_inst
))
620 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
621 "client.mirror_remote@ceph", '/d0', 'snap0', expected_snap_count
=1)
622 self
.verify_snapshot('d0', 'snap0')
624 self
.remove_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
625 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
627 def test_cephfs_mirror_failed_sync_with_correction(self
):
628 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
629 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
631 # add a non-existent directory for synchronization
632 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
634 # wait for mirror daemon to mark it the directory as failed
636 self
.verify_failed_directory(self
.primary_fs_name
, self
.primary_fs_id
,
637 "client.mirror_remote@ceph", '/d0')
639 # create the directory
640 self
.mount_a
.run_shell(["mkdir", "d0"])
641 self
.mount_a
.run_shell(["mkdir", "d0/.snap/snap0"])
643 # wait for correction
645 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
646 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
647 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
649 def test_cephfs_mirror_service_daemon_status(self
):
650 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
651 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
654 status
= self
.get_mirror_daemon_status()
656 # assumption for this test: mirroring enabled for a single filesystem w/ single
659 # we have not added any directories
660 peer
= status
['filesystems'][0]['peers'][0]
661 self
.assertEquals(status
['filesystems'][0]['directory_count'], 0)
662 self
.assertEquals(peer
['stats']['failure_count'], 0)
663 self
.assertEquals(peer
['stats']['recovery_count'], 0)
665 # add a non-existent directory for synchronization -- check if its reported
667 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
670 status
= self
.get_mirror_daemon_status()
672 peer
= status
['filesystems'][0]['peers'][0]
673 self
.assertEquals(status
['filesystems'][0]['directory_count'], 1)
674 # failure count should be reflected
675 self
.assertEquals(peer
['stats']['failure_count'], 1)
676 self
.assertEquals(peer
['stats']['recovery_count'], 0)
678 # create the directory, mirror daemon would recover
679 self
.mount_a
.run_shell(["mkdir", "d0"])
682 status
= self
.get_mirror_daemon_status()
683 peer
= status
['filesystems'][0]['peers'][0]
684 self
.assertEquals(status
['filesystems'][0]['directory_count'], 1)
685 # failure and recovery count should be reflected
686 self
.assertEquals(peer
['stats']['failure_count'], 1)
687 self
.assertEquals(peer
['stats']['recovery_count'], 1)
689 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
691 def test_mirroring_init_failure(self
):
692 """Test mirror daemon init failure"""
694 # disable mgr mirroring plugin as it would try to load dir map on
695 # on mirroring enabled for a filesystem (an throw up erorrs in
697 self
.disable_mirroring_module()
699 # enable mirroring through mon interface -- this should result in the mirror daemon
700 # failing to enable mirroring due to absence of `cephfs_mirorr` index object.
701 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "mirror", "enable", self
.primary_fs_name
)
703 with
safe_while(sleep
=5, tries
=10, action
='wait for failed state') as proceed
:
707 res
= self
.mirror_daemon_command(f
'mirror status for fs: {self.primary_fs_name}',
708 'fs', 'mirror', 'status', f
'{self.primary_fs_name}@{self.primary_fs_id}')
709 if not 'state' in res
:
711 self
.assertTrue(res
['state'] == "failed")
716 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "mirror", "disable", self
.primary_fs_name
)
720 self
.mirror_daemon_command(f
'mirror status for fs: {self.primary_fs_name}',
721 'fs', 'mirror', 'status', f
'{self.primary_fs_name}@{self.primary_fs_id}')
722 except CommandFailedError
:
725 raise RuntimeError('expected admin socket to be unavailable')
727 def test_mirroring_init_failure_with_recovery(self
):
728 """Test if the mirror daemon can recover from a init failure"""
730 # disable mgr mirroring plugin as it would try to load dir map on
731 # on mirroring enabled for a filesystem (an throw up erorrs in
733 self
.disable_mirroring_module()
735 # enable mirroring through mon interface -- this should result in the mirror daemon
736 # failing to enable mirroring due to absence of `cephfs_mirror` index object.
738 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "mirror", "enable", self
.primary_fs_name
)
739 # need safe_while since non-failed status pops up as mirroring is restarted
740 # internally in mirror daemon.
741 with
safe_while(sleep
=5, tries
=20, action
='wait for failed state') as proceed
:
745 res
= self
.mirror_daemon_command(f
'mirror status for fs: {self.primary_fs_name}',
746 'fs', 'mirror', 'status', f
'{self.primary_fs_name}@{self.primary_fs_id}')
747 if not 'state' in res
:
749 self
.assertTrue(res
['state'] == "failed")
754 # create the index object and check daemon recovery
756 p
= self
.mount_a
.client_remote
.run(args
=['rados', '-p', self
.fs
.metadata_pool_name
, 'create', 'cephfs_mirror'],
757 stdout
=StringIO(), stderr
=StringIO(), timeout
=30,
758 check_status
=True, label
="create index object")
760 except CommandFailedError
as ce
:
761 log
.warn(f
'mirror daemon command to create mirror index object failed: {ce}')
764 res
= self
.mirror_daemon_command(f
'mirror status for fs: {self.primary_fs_name}',
765 'fs', 'mirror', 'status', f
'{self.primary_fs_name}@{self.primary_fs_id}')
766 self
.assertTrue(res
['peers'] == {})
767 self
.assertTrue(res
['snap_dirs']['dir_count'] == 0)
769 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "mirror", "disable", self
.primary_fs_name
)
773 self
.mirror_daemon_command(f
'mirror status for fs: {self.primary_fs_name}',
774 'fs', 'mirror', 'status', f
'{self.primary_fs_name}@{self.primary_fs_id}')
775 except CommandFailedError
:
778 raise RuntimeError('expected admin socket to be unavailable')
780 def test_cephfs_mirror_peer_bootstrap(self
):
781 """Test importing peer bootstrap token"""
782 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
784 # create a bootstrap token for the peer
785 bootstrap_token
= self
.bootstrap_peer(self
.secondary_fs_name
, "client.mirror_peer_bootstrap", "site-remote")
787 # import the peer via bootstrap token
788 self
.import_peer(self
.primary_fs_name
, bootstrap_token
)
790 self
.verify_peer_added(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_peer_bootstrap@site-remote",
791 self
.secondary_fs_name
)
793 # verify via peer_list interface
794 peer_uuid
= self
.get_peer_uuid("client.mirror_peer_bootstrap@site-remote")
795 res
= json
.loads(self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_list", self
.primary_fs_name
))
796 self
.assertTrue(peer_uuid
in res
)
797 self
.assertTrue('mon_host' in res
[peer_uuid
] and res
[peer_uuid
]['mon_host'] != '')
800 self
.peer_remove(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_peer_bootstrap@site-remote")
802 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
804 def test_cephfs_mirror_symlink_sync(self
):
805 log
.debug('reconfigure client auth caps')
806 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
807 'auth', 'caps', "client.{0}".format(self
.mount_b
.client_id
),
810 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
811 self
.backup_fs
.get_data_pool_name(), self
.backup_fs
.get_data_pool_name()))
813 log
.debug(f
'mounting filesystem {self.secondary_fs_name}')
814 self
.mount_b
.umount_wait()
815 self
.mount_b
.mount_wait(cephfs_name
=self
.secondary_fs_name
)
817 # create a bunch of files w/ symbolic links in a directory to snap
818 self
.mount_a
.run_shell(["mkdir", "d0"])
819 self
.mount_a
.create_n_files('d0/file', 10, sync
=True)
820 self
.mount_a
.run_shell(["ln", "-s", "./file_0", "d0/sym_0"])
821 self
.mount_a
.run_shell(["ln", "-s", "./file_1", "d0/sym_1"])
822 self
.mount_a
.run_shell(["ln", "-s", "./file_2", "d0/sym_2"])
824 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
825 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
826 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
829 self
.mount_a
.run_shell(["mkdir", "d0/.snap/snap0"])
832 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
833 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
834 self
.verify_snapshot('d0', 'snap0')
836 self
.remove_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
837 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
839 def test_cephfs_mirror_with_parent_snapshot(self
):
840 """Test snapshot synchronization with parent directory snapshots"""
841 self
.mount_a
.run_shell(["mkdir", "-p", "d0/d1/d2/d3"])
843 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
844 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0/d1/d2/d3')
845 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
848 self
.mount_a
.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap0"])
851 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
852 "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap0', 1)
854 # create snapshots in parent directories
855 self
.mount_a
.run_shell(["mkdir", "d0/.snap/snap_d0"])
856 self
.mount_a
.run_shell(["mkdir", "d0/d1/.snap/snap_d1"])
857 self
.mount_a
.run_shell(["mkdir", "d0/d1/d2/.snap/snap_d2"])
859 # try syncing more snapshots
860 self
.mount_a
.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap1"])
862 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
863 "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap1', 2)
865 self
.mount_a
.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap0"])
866 self
.mount_a
.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap1"])
868 self
.check_peer_status_deleted_snap(self
.primary_fs_name
, self
.primary_fs_id
,
869 "client.mirror_remote@ceph", '/d0/d1/d2/d3', 2)
871 self
.remove_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0/d1/d2/d3')
872 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
874 def test_cephfs_mirror_remove_on_stall(self
):
875 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
877 # fetch rados address for blacklist check
878 rados_inst
= self
.get_mirror_rados_addr(self
.primary_fs_name
, self
.primary_fs_id
)
880 # simulate non-responding mirror daemon by sending SIGSTOP
881 pid
= self
.get_mirror_daemon_pid()
882 log
.debug(f
'SIGSTOP to cephfs-mirror pid {pid}')
883 self
.mount_a
.run_shell(['kill', '-SIGSTOP', pid
])
885 # wait for blocklist timeout -- the manager module would blocklist
889 # make sure the rados addr is blocklisted
890 self
.assertTrue(self
.mds_cluster
.is_addr_blocklisted(rados_inst
))
892 # now we are sure that there are no "active" mirror daemons -- add a directory path.
893 dir_path_p
= "/d0/d1"
894 dir_path
= "/d0/d1/d2"
896 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "add", self
.primary_fs_name
, dir_path
)
899 # this uses an undocumented interface to get dirpath map state
900 res_json
= self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self
.primary_fs_name
, dir_path
)
901 res
= json
.loads(res_json
)
902 # there are no mirror daemons
903 self
.assertTrue(res
['state'], 'stalled')
905 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "remove", self
.primary_fs_name
, dir_path
)
909 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self
.primary_fs_name
, dir_path
)
910 except CommandFailedError
as ce
:
911 if ce
.exitstatus
!= errno
.ENOENT
:
912 raise RuntimeError('invalid errno when checking dirmap status for non-existent directory')
914 raise RuntimeError('incorrect errno when checking dirmap state for non-existent directory')
916 # adding a parent directory should be allowed
917 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "add", self
.primary_fs_name
, dir_path_p
)
920 # however, this directory path should get stalled too
921 res_json
= self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self
.primary_fs_name
, dir_path_p
)
922 res
= json
.loads(res_json
)
923 # there are no mirror daemons
924 self
.assertTrue(res
['state'], 'stalled')
926 # wake up the mirror daemon -- at this point, the daemon should know
927 # that it has been blocklisted
928 log
.debug('SIGCONT to cephfs-mirror')
929 self
.mount_a
.run_shell(['kill', '-SIGCONT', pid
])
931 # wait for restart mirror on blocklist
933 res_json
= self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self
.primary_fs_name
, dir_path_p
)
934 res
= json
.loads(res_json
)
935 # there are no mirror daemons
936 self
.assertTrue(res
['state'], 'mapped')
938 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
940 def test_cephfs_mirror_incremental_sync(self
):
941 """ Test incremental snapshot synchronization (based on mtime differences)."""
942 log
.debug('reconfigure client auth caps')
943 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
944 'auth', 'caps', "client.{0}".format(self
.mount_b
.client_id
),
947 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
948 self
.backup_fs
.get_data_pool_name(), self
.backup_fs
.get_data_pool_name()))
949 log
.debug(f
'mounting filesystem {self.secondary_fs_name}')
950 self
.mount_b
.umount_wait()
951 self
.mount_b
.mount_wait(cephfs_name
=self
.secondary_fs_name
)
953 repo
= 'ceph-qa-suite'
954 repo_dir
= 'ceph_repo'
955 repo_path
= f
'{repo_dir}/{repo}'
958 self
.mount_a
.run_shell([
959 'git', 'clone', '--branch', 'giant',
960 f
'http://github.com/ceph/{repo}', repo_path
])
962 def exec_git_cmd(cmd_list
):
963 self
.mount_a
.run_shell(['git', '--git-dir', f
'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list
])
965 self
.mount_a
.run_shell(["mkdir", repo_dir
])
968 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
969 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
971 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, f
'/{repo_path}')
972 self
.mount_a
.run_shell(['mkdir', f
'{repo_path}/.snap/snap_a'])
974 # full copy, takes time
976 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
977 "client.mirror_remote@ceph", f
'/{repo_path}', 'snap_a', 1)
978 self
.verify_snapshot(repo_path
, 'snap_a')
981 num
= random
.randint(5, 20)
982 log
.debug(f
'resetting to HEAD~{num}')
983 exec_git_cmd(["reset", "--hard", f
'HEAD~{num}'])
985 self
.mount_a
.run_shell(['mkdir', f
'{repo_path}/.snap/snap_b'])
986 # incremental copy, should be fast
988 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
989 "client.mirror_remote@ceph", f
'/{repo_path}', 'snap_b', 2)
990 self
.verify_snapshot(repo_path
, 'snap_b')
992 # diff again, this time back to HEAD
993 log
.debug('resetting to HEAD')
994 exec_git_cmd(["pull"])
996 self
.mount_a
.run_shell(['mkdir', f
'{repo_path}/.snap/snap_c'])
997 # incremental copy, should be fast
999 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
1000 "client.mirror_remote@ceph", f
'/{repo_path}', 'snap_c', 3)
1001 self
.verify_snapshot(repo_path
, 'snap_c')
1003 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
1005 def test_cephfs_mirror_incremental_sync_with_type_mixup(self
):
1006 """ Test incremental snapshot synchronization with file type changes.
1008 The same filename exist as a different type in subsequent snapshot.
1009 This verifies if the mirror daemon can identify file type mismatch and
1012 \ snap_0 snap_1 snap_2 snap_3
1013 \-----------------------------------------------
1014 file_x | reg sym dir reg
1016 file_y | dir reg sym dir
1018 file_z | sym dir reg sym
1020 log
.debug('reconfigure client auth caps')
1021 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
1022 'auth', 'caps', "client.{0}".format(self
.mount_b
.client_id
),
1025 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
1026 self
.backup_fs
.get_data_pool_name(), self
.backup_fs
.get_data_pool_name()))
1027 log
.debug(f
'mounting filesystem {self.secondary_fs_name}')
1028 self
.mount_b
.umount_wait()
1029 self
.mount_b
.mount_wait(cephfs_name
=self
.secondary_fs_name
)
1031 typs
= deque(['reg', 'dir', 'sym'])
1032 def cleanup_and_create_with_type(dirname
, fnames
):
1033 self
.mount_a
.run_shell_payload(f
"rm -rf {dirname}/*")
1036 fname
= f
'{dirname}/{fnames[fidx]}'
1037 log
.debug(f
'file: {fname} type: {t}')
1039 self
.mount_a
.run_shell(["touch", fname
])
1040 self
.mount_a
.write_file(fname
, data
=fname
)
1042 self
.mount_a
.run_shell(["mkdir", fname
])
1044 # verify ELOOP in mirror daemon
1045 self
.mount_a
.run_shell(["ln", "-s", "..", fname
])
1048 def verify_types(dirname
, fnames
, snap_name
):
1050 for fname
in fnames
:
1051 t
= self
.mount_b
.run_shell_payload(f
"stat -c %F {dirname}/.snap/{snap_name}/{fname}").stdout
.getvalue().strip()
1052 if typs
[tidx
] == 'reg':
1053 self
.assertEquals('regular file', t
)
1054 elif typs
[tidx
] == 'dir':
1055 self
.assertEquals('directory', t
)
1056 elif typs
[tidx
] == 'sym':
1057 self
.assertEquals('symbolic link', t
)
1060 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
1061 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
1063 self
.mount_a
.run_shell(["mkdir", "d0"])
1064 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
1066 fnames
= ['file_x', 'file_y', 'file_z']
1068 while turns
!= len(typs
):
1069 snapname
= f
'snap_{turns}'
1070 cleanup_and_create_with_type('d0', fnames
)
1071 self
.mount_a
.run_shell(['mkdir', f
'd0/.snap/{snapname}'])
1073 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
1074 "client.mirror_remote@ceph", '/d0', snapname
, turns
+1)
1075 verify_types('d0', fnames
, snapname
)
1080 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
1082 def test_cephfs_mirror_sync_with_purged_snapshot(self
):
1083 """Test snapshot synchronization in midst of snapshot deletes.
1085 Deleted the previous snapshot when the mirror daemon is figuring out
1086 incremental differences between current and previous snaphot. The
1087 mirror daemon should identify the purge and switch to using remote
1088 comparison to sync the snapshot (in the next iteration of course).
1091 log
.debug('reconfigure client auth caps')
1092 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
1093 'auth', 'caps', "client.{0}".format(self
.mount_b
.client_id
),
1096 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
1097 self
.backup_fs
.get_data_pool_name(), self
.backup_fs
.get_data_pool_name()))
1098 log
.debug(f
'mounting filesystem {self.secondary_fs_name}')
1099 self
.mount_b
.umount_wait()
1100 self
.mount_b
.mount_wait(cephfs_name
=self
.secondary_fs_name
)
1102 repo
= 'ceph-qa-suite'
1103 repo_dir
= 'ceph_repo'
1104 repo_path
= f
'{repo_dir}/{repo}'
1107 self
.mount_a
.run_shell([
1108 'git', 'clone', '--branch', 'giant',
1109 f
'http://github.com/ceph/{repo}', repo_path
])
1111 def exec_git_cmd(cmd_list
):
1112 self
.mount_a
.run_shell(['git', '--git-dir', f
'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list
])
1114 self
.mount_a
.run_shell(["mkdir", repo_dir
])
1117 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
1118 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
1120 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, f
'/{repo_path}')
1121 self
.mount_a
.run_shell(['mkdir', f
'{repo_path}/.snap/snap_a'])
1123 # full copy, takes time
1125 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
1126 "client.mirror_remote@ceph", f
'/{repo_path}', 'snap_a', 1)
1127 self
.verify_snapshot(repo_path
, 'snap_a')
1130 num
= random
.randint(60, 100)
1131 log
.debug(f
'resetting to HEAD~{num}')
1132 exec_git_cmd(["reset", "--hard", f
'HEAD~{num}'])
1134 self
.mount_a
.run_shell(['mkdir', f
'{repo_path}/.snap/snap_b'])
1137 self
.mount_a
.run_shell(['rmdir', f
'{repo_path}/.snap/snap_a'])
1139 # incremental copy but based on remote dir_root
1141 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
1142 "client.mirror_remote@ceph", f
'/{repo_path}', 'snap_b', 2)
1143 self
.verify_snapshot(repo_path
, 'snap_b')
1145 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
1147 def test_cephfs_mirror_peer_add_primary(self
):
1148 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
1149 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
1151 # try adding the primary file system as a peer to secondary file
1154 self
.peer_add(self
.secondary_fs_name
, self
.secondary_fs_id
, "client.mirror_remote@ceph", self
.primary_fs_name
)
1155 except CommandFailedError
as ce
:
1156 if ce
.exitstatus
!= errno
.EINVAL
:
1157 raise RuntimeError('invalid errno when adding a primary file system')
1159 raise RuntimeError('adding peer should fail')
1161 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
1163 def test_cephfs_mirror_cancel_mirroring_and_readd(self
):
1165 Test adding a directory path for synchronization post removal of already added directory paths
1167 ... to ensure that synchronization of the newly added directory path functions
1168 as expected. Note that we schedule three (3) directories for mirroring to ensure
1169 that all replayer threads (3 by default) in the mirror daemon are busy.
1171 log
.debug('reconfigure client auth caps')
1172 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
1173 'auth', 'caps', "client.{0}".format(self
.mount_b
.client_id
),
1176 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
1177 self
.backup_fs
.get_data_pool_name(), self
.backup_fs
.get_data_pool_name()))
1179 log
.debug(f
'mounting filesystem {self.secondary_fs_name}')
1180 self
.mount_b
.umount_wait()
1181 self
.mount_b
.mount_wait(cephfs_name
=self
.secondary_fs_name
)
1183 # create a bunch of files in a directory to snap
1184 self
.mount_a
.run_shell(["mkdir", "d0"])
1185 self
.mount_a
.run_shell(["mkdir", "d1"])
1186 self
.mount_a
.run_shell(["mkdir", "d2"])
1188 filename
= f
'file.{i}'
1189 self
.mount_a
.write_n_mb(os
.path
.join('d0', filename
), 1024)
1190 self
.mount_a
.write_n_mb(os
.path
.join('d1', filename
), 1024)
1191 self
.mount_a
.write_n_mb(os
.path
.join('d2', filename
), 1024)
1193 log
.debug('enabling mirroring')
1194 self
.enable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)
1195 log
.debug('adding directory paths')
1196 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
1197 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d1')
1198 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d2')
1199 self
.peer_add(self
.primary_fs_name
, self
.primary_fs_id
, "client.mirror_remote@ceph", self
.secondary_fs_name
)
1202 log
.debug('taking snapshots')
1203 self
.mount_a
.run_shell(["mkdir", "d0/.snap/snap0"])
1204 self
.mount_a
.run_shell(["mkdir", "d1/.snap/snap0"])
1205 self
.mount_a
.run_shell(["mkdir", "d2/.snap/snap0"])
1208 log
.debug('checking snap in progress')
1209 self
.check_peer_snap_in_progress(self
.primary_fs_name
, self
.primary_fs_id
,
1210 "client.mirror_remote@ceph", '/d0', 'snap0')
1211 self
.check_peer_snap_in_progress(self
.primary_fs_name
, self
.primary_fs_id
,
1212 "client.mirror_remote@ceph", '/d1', 'snap0')
1213 self
.check_peer_snap_in_progress(self
.primary_fs_name
, self
.primary_fs_id
,
1214 "client.mirror_remote@ceph", '/d2', 'snap0')
1216 log
.debug('removing directories 1')
1217 self
.remove_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
1218 log
.debug('removing directories 2')
1219 self
.remove_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d1')
1220 log
.debug('removing directories 3')
1221 self
.remove_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d2')
1223 log
.debug('removing snapshots')
1224 self
.mount_a
.run_shell(["rmdir", "d0/.snap/snap0"])
1225 self
.mount_a
.run_shell(["rmdir", "d1/.snap/snap0"])
1226 self
.mount_a
.run_shell(["rmdir", "d2/.snap/snap0"])
1229 filename
= f
'file.{i}'
1230 log
.debug(f
'deleting {filename}')
1231 self
.mount_a
.run_shell(["rm", "-f", os
.path
.join('d0', filename
)])
1232 self
.mount_a
.run_shell(["rm", "-f", os
.path
.join('d1', filename
)])
1233 self
.mount_a
.run_shell(["rm", "-f", os
.path
.join('d2', filename
)])
1235 log
.debug('creating new files...')
1236 self
.mount_a
.create_n_files('d0/file', 50, sync
=True)
1237 self
.mount_a
.create_n_files('d1/file', 50, sync
=True)
1238 self
.mount_a
.create_n_files('d2/file', 50, sync
=True)
1240 log
.debug('adding directory paths')
1241 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d0')
1242 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d1')
1243 self
.add_directory(self
.primary_fs_name
, self
.primary_fs_id
, '/d2')
1245 log
.debug('creating new snapshots...')
1246 self
.mount_a
.run_shell(["mkdir", "d0/.snap/snap0"])
1247 self
.mount_a
.run_shell(["mkdir", "d1/.snap/snap0"])
1248 self
.mount_a
.run_shell(["mkdir", "d2/.snap/snap0"])
1251 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
1252 "client.mirror_remote@ceph", '/d0', 'snap0', 1)
1253 self
.verify_snapshot('d0', 'snap0')
1255 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
1256 "client.mirror_remote@ceph", '/d1', 'snap0', 1)
1257 self
.verify_snapshot('d1', 'snap0')
1259 self
.check_peer_status(self
.primary_fs_name
, self
.primary_fs_id
,
1260 "client.mirror_remote@ceph", '/d2', 'snap0', 1)
1261 self
.verify_snapshot('d2', 'snap0')
1263 self
.disable_mirroring(self
.primary_fs_name
, self
.primary_fs_id
)