]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_nfs.py
import ceph pacific 16.2.5
[ceph.git] / ceph / qa / tasks / cephfs / test_nfs.py
1 # NOTE: these tests are not yet compatible with vstart_runner.py.
2 import errno
3 import json
4 import time
5 import logging
6 from io import BytesIO
7
8 from tasks.mgr.mgr_test_case import MgrTestCase
9 from teuthology.exceptions import CommandFailedError
10
11 log = logging.getLogger(__name__)
12
13
14 # TODO Add test for cluster update when ganesha can be deployed on multiple ports.
15 class TestNFS(MgrTestCase):
16 def _cmd(self, *args):
17 return self.mgr_cluster.mon_manager.raw_cluster_cmd(*args)
18
19 def _nfs_cmd(self, *args):
20 return self._cmd("nfs", *args)
21
22 def _orch_cmd(self, *args):
23 return self._cmd("orch", *args)
24
25 def _sys_cmd(self, cmd):
26 cmd[0:0] = ['sudo']
27 ret = self.ctx.cluster.run(args=cmd, check_status=False, stdout=BytesIO(), stderr=BytesIO())
28 stdout = ret[0].stdout
29 if stdout:
30 return stdout.getvalue()
31
32 def setUp(self):
33 super(TestNFS, self).setUp()
34 self._load_module('nfs')
35 self.cluster_id = "test"
36 self.export_type = "cephfs"
37 self.pseudo_path = "/cephfs"
38 self.path = "/"
39 self.fs_name = "nfs-cephfs"
40 self.expected_name = "nfs.test"
41 self.sample_export = {
42 "export_id": 1,
43 "path": self.path,
44 "cluster_id": self.cluster_id,
45 "pseudo": self.pseudo_path,
46 "access_type": "RW",
47 "squash": "no_root_squash",
48 "security_label": True,
49 "protocols": [
50 4
51 ],
52 "transports": [
53 "TCP"
54 ],
55 "fsal": {
56 "name": "CEPH",
57 "user_id": "test1",
58 "fs_name": self.fs_name,
59 "sec_label_xattr": ''
60 },
61 "clients": []
62 }
63
64 def _check_nfs_server_status(self):
65 res = self._sys_cmd(['systemctl', 'status', 'nfs-server'])
66 if isinstance(res, bytes) and b'Active: active' in res:
67 self._disable_nfs()
68
69 def _disable_nfs(self):
70 log.info("Disabling NFS")
71 self._sys_cmd(['systemctl', 'disable', 'nfs-server', '--now'])
72
73 def _fetch_nfs_status(self):
74 return self._orch_cmd('ps', f'--service_name={self.expected_name}')
75
76 def _check_nfs_cluster_status(self, expected_status, fail_msg):
77 '''
78 Tests if nfs cluster created or deleted successfully
79 :param expected_status: Status to be verified
80 :param fail_msg: Message to be printed if test failed
81 '''
82 # Wait for few seconds as ganesha daemon takes few seconds to be deleted/created
83 wait_time = 10
84 while wait_time <= 60:
85 time.sleep(wait_time)
86 if expected_status in self._fetch_nfs_status():
87 return
88 wait_time += 10
89 self.fail(fail_msg)
90
91 def _check_auth_ls(self, export_id=1, check_in=False):
92 '''
93 Tests export user id creation or deletion.
94 :param export_id: Denotes export number
95 :param check_in: Check specified export id
96 '''
97 output = self._cmd('auth', 'ls')
98 if check_in:
99 self.assertIn(f'client.{self.cluster_id}{export_id}', output)
100 else:
101 self.assertNotIn(f'client-{self.cluster_id}', output)
102
103 def _test_idempotency(self, cmd_func, cmd_args):
104 '''
105 Test idempotency of commands. It first runs the TestNFS test method
106 for a command and then checks the result of command run again. TestNFS
107 test method has required checks to verify that command works.
108 :param cmd_func: TestNFS method
109 :param cmd_args: nfs command arguments to be run
110 '''
111 cmd_func()
112 ret = self.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd_args)
113 if ret != 0:
114 self.fail("Idempotency test failed")
115
116 def _test_create_cluster(self):
117 '''
118 Test single nfs cluster deployment.
119 '''
120 # Disable any running nfs ganesha daemon
121 self._check_nfs_server_status()
122 self._nfs_cmd('cluster', 'create', self.cluster_id)
123 # Check for expected status and daemon name (nfs.<cluster_id>)
124 self._check_nfs_cluster_status('running', 'NFS Ganesha cluster deployment failed')
125
126 def _test_delete_cluster(self):
127 '''
128 Test deletion of a single nfs cluster.
129 '''
130 self._nfs_cmd('cluster', 'rm', self.cluster_id)
131 self._check_nfs_cluster_status('No daemons reported',
132 'NFS Ganesha cluster could not be deleted')
133
134 def _test_list_cluster(self, empty=False):
135 '''
136 Test listing of deployed nfs clusters. If nfs cluster is deployed then
137 it checks for expected cluster id. Otherwise checks nothing is listed.
138 :param empty: If true it denotes no cluster is deployed.
139 '''
140 if empty:
141 cluster_id = ''
142 else:
143 cluster_id = self.cluster_id
144 nfs_output = self._nfs_cmd('cluster', 'ls')
145 self.assertEqual(cluster_id, nfs_output.strip())
146
147 def _create_export(self, export_id, create_fs=False, extra_cmd=None):
148 '''
149 Test creation of a single export.
150 :param export_id: Denotes export number
151 :param create_fs: If false filesytem exists. Otherwise create it.
152 :param extra_cmd: List of extra arguments for creating export.
153 '''
154 if create_fs:
155 self._cmd('fs', 'volume', 'create', self.fs_name)
156 export_cmd = ['nfs', 'export', 'create', 'cephfs', self.fs_name, self.cluster_id]
157 if isinstance(extra_cmd, list):
158 export_cmd.extend(extra_cmd)
159 else:
160 export_cmd.append(self.pseudo_path)
161 # Runs the nfs export create command
162 self._cmd(*export_cmd)
163 # Check if user id for export is created
164 self._check_auth_ls(export_id, check_in=True)
165 res = self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'get',
166 f'export-{export_id}', '-'])
167 # Check if export object is created
168 if res == b'':
169 self.fail("Export cannot be created")
170
171 def _create_default_export(self):
172 '''
173 Deploy a single nfs cluster and create export with default options.
174 '''
175 self._test_create_cluster()
176 self._create_export(export_id='1', create_fs=True)
177
178 def _delete_export(self):
179 '''
180 Delete an export.
181 '''
182 self._nfs_cmd('export', 'rm', self.cluster_id, self.pseudo_path)
183 self._check_auth_ls()
184
185 def _test_list_export(self):
186 '''
187 Test listing of created exports.
188 '''
189 nfs_output = json.loads(self._nfs_cmd('export', 'ls', self.cluster_id))
190 self.assertIn(self.pseudo_path, nfs_output)
191
192 def _test_list_detailed(self, sub_vol_path):
193 '''
194 Test listing of created exports with detailed option.
195 :param sub_vol_path: Denotes path of subvolume
196 '''
197 nfs_output = json.loads(self._nfs_cmd('export', 'ls', self.cluster_id, '--detailed'))
198 # Export-1 with default values (access type = rw and path = '\')
199 self.assertDictEqual(self.sample_export, nfs_output[0])
200 # Export-2 with r only
201 self.sample_export['export_id'] = 2
202 self.sample_export['pseudo'] = self.pseudo_path + '1'
203 self.sample_export['access_type'] = 'RO'
204 self.sample_export['fsal']['user_id'] = self.cluster_id + '2'
205 self.assertDictEqual(self.sample_export, nfs_output[1])
206 # Export-3 for subvolume with r only
207 self.sample_export['export_id'] = 3
208 self.sample_export['path'] = sub_vol_path
209 self.sample_export['pseudo'] = self.pseudo_path + '2'
210 self.sample_export['fsal']['user_id'] = self.cluster_id + '3'
211 self.assertDictEqual(self.sample_export, nfs_output[2])
212 # Export-4 for subvolume
213 self.sample_export['export_id'] = 4
214 self.sample_export['pseudo'] = self.pseudo_path + '3'
215 self.sample_export['access_type'] = 'RW'
216 self.sample_export['fsal']['user_id'] = self.cluster_id + '4'
217 self.assertDictEqual(self.sample_export, nfs_output[3])
218
219 def _get_export(self):
220 '''
221 Returns export block in json format
222 '''
223 return json.loads(self._nfs_cmd('export', 'get', self.cluster_id, self.pseudo_path))
224
225 def _test_get_export(self):
226 '''
227 Test fetching of created export.
228 '''
229 nfs_output = self._get_export()
230 self.assertDictEqual(self.sample_export, nfs_output)
231
232 def _check_export_obj_deleted(self, conf_obj=False):
233 '''
234 Test if export or config object are deleted successfully.
235 :param conf_obj: It denotes config object needs to be checked
236 '''
237 rados_obj_ls = self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'ls'])
238
239 if b'export-' in rados_obj_ls or (conf_obj and b'conf-nfs' in rados_obj_ls):
240 self.fail("Delete export failed")
241
242 def _get_port_ip_info(self):
243 '''
244 Return port and ip for a cluster
245 '''
246 #{'test': {'backend': [{'hostname': 'smithi068', 'ip': '172.21.15.68', 'port': 2049}]}}
247 info_output = json.loads(self._nfs_cmd('cluster', 'info', self.cluster_id))['test']['backend'][0]
248 return info_output["port"], info_output["ip"]
249
250 def _test_mnt(self, pseudo_path, port, ip, check=True):
251 '''
252 Test mounting of created exports
253 :param pseudo_path: It is the pseudo root name
254 :param port: Port of deployed nfs cluster
255 :param ip: IP of deployed nfs cluster
256 :param check: It denotes if i/o testing needs to be done
257 '''
258 try:
259 self.ctx.cluster.run(args=['sudo', 'mount', '-t', 'nfs', '-o', f'port={port}',
260 f'{ip}:{pseudo_path}', '/mnt'])
261 except CommandFailedError as e:
262 # Check if mount failed only when non existing pseudo path is passed
263 if not check and e.exitstatus == 32:
264 return
265 raise
266
267 try:
268 self.ctx.cluster.run(args=['sudo', 'touch', '/mnt/test'])
269 out_mnt = self._sys_cmd(['sudo', 'ls', '/mnt'])
270 self.assertEqual(out_mnt, b'test\n')
271 finally:
272 self.ctx.cluster.run(args=['sudo', 'umount', '/mnt'])
273
274 def _write_to_read_only_export(self, pseudo_path, port, ip):
275 '''
276 Check if write to read only export fails
277 '''
278 try:
279 self._test_mnt(pseudo_path, port, ip)
280 except CommandFailedError as e:
281 # Write to cephfs export should fail for test to pass
282 if e.exitstatus != errno.EPERM:
283 raise
284
285 def test_create_and_delete_cluster(self):
286 '''
287 Test successful creation and deletion of the nfs cluster.
288 '''
289 self._test_create_cluster()
290 self._test_list_cluster()
291 self._test_delete_cluster()
292 # List clusters again to ensure no cluster is shown
293 self._test_list_cluster(empty=True)
294
295 def test_create_delete_cluster_idempotency(self):
296 '''
297 Test idempotency of cluster create and delete commands.
298 '''
299 self._test_idempotency(self._test_create_cluster, ['nfs', 'cluster', 'create', self.cluster_id])
300 self._test_idempotency(self._test_delete_cluster, ['nfs', 'cluster', 'rm', self.cluster_id])
301
302 def test_create_cluster_with_invalid_cluster_id(self):
303 '''
304 Test nfs cluster deployment failure with invalid cluster id.
305 '''
306 try:
307 invalid_cluster_id = '/cluster_test' # Only [A-Za-z0-9-_.] chars are valid
308 self._nfs_cmd('cluster', 'create', invalid_cluster_id)
309 self.fail(f"Cluster successfully created with invalid cluster id {invalid_cluster_id}")
310 except CommandFailedError as e:
311 # Command should fail for test to pass
312 if e.exitstatus != errno.EINVAL:
313 raise
314
315 def test_create_and_delete_export(self):
316 '''
317 Test successful creation and deletion of the cephfs export.
318 '''
319 self._create_default_export()
320 self._test_get_export()
321 port, ip = self._get_port_ip_info()
322 self._test_mnt(self.pseudo_path, port, ip)
323 self._delete_export()
324 # Check if rados export object is deleted
325 self._check_export_obj_deleted()
326 self._test_mnt(self.pseudo_path, port, ip, False)
327 self._test_delete_cluster()
328
329 def test_create_delete_export_idempotency(self):
330 '''
331 Test idempotency of export create and delete commands.
332 '''
333 self._test_idempotency(self._create_default_export, ['nfs', 'export', 'create', 'cephfs',
334 self.fs_name, self.cluster_id,
335 self.pseudo_path])
336 self._test_idempotency(self._delete_export, ['nfs', 'export', 'rm', self.cluster_id,
337 self.pseudo_path])
338 self._test_delete_cluster()
339
340 def test_create_multiple_exports(self):
341 '''
342 Test creating multiple exports with different access type and path.
343 '''
344 # Export-1 with default values (access type = rw and path = '\')
345 self._create_default_export()
346 # Export-2 with r only
347 self._create_export(export_id='2', extra_cmd=[self.pseudo_path+'1', '--readonly'])
348 # Export-3 for subvolume with r only
349 self._cmd('fs', 'subvolume', 'create', self.fs_name, 'sub_vol')
350 fs_path = self._cmd('fs', 'subvolume', 'getpath', self.fs_name, 'sub_vol').strip()
351 self._create_export(export_id='3', extra_cmd=[self.pseudo_path+'2', '--readonly', fs_path])
352 # Export-4 for subvolume
353 self._create_export(export_id='4', extra_cmd=[self.pseudo_path+'3', fs_path])
354 # Check if exports gets listed
355 self._test_list_detailed(fs_path)
356 self._test_delete_cluster()
357 # Check if rados ganesha conf object is deleted
358 self._check_export_obj_deleted(conf_obj=True)
359 self._check_auth_ls()
360
361 def test_exports_on_mgr_restart(self):
362 '''
363 Test export availability on restarting mgr.
364 '''
365 self._create_default_export()
366 # unload and load module will restart the mgr
367 self._unload_module("cephadm")
368 self._load_module("cephadm")
369 self._orch_cmd("set", "backend", "cephadm")
370 # Check if ganesha daemon is running
371 self._check_nfs_cluster_status('running', 'Failed to redeploy NFS Ganesha cluster')
372 # Checks if created export is listed
373 self._test_list_export()
374 port, ip = self._get_port_ip_info()
375 self._test_mnt(self.pseudo_path, port, ip)
376 self._delete_export()
377 self._test_delete_cluster()
378
379 def test_export_create_with_non_existing_fsname(self):
380 '''
381 Test creating export with non-existing filesystem.
382 '''
383 try:
384 fs_name = 'nfs-test'
385 self._test_create_cluster()
386 self._nfs_cmd('export', 'create', 'cephfs', fs_name, self.cluster_id, self.pseudo_path)
387 self.fail(f"Export created with non-existing filesystem {fs_name}")
388 except CommandFailedError as e:
389 # Command should fail for test to pass
390 if e.exitstatus != errno.ENOENT:
391 raise
392 finally:
393 self._test_delete_cluster()
394
395 def test_export_create_with_non_existing_clusterid(self):
396 '''
397 Test creating cephfs export with non-existing nfs cluster.
398 '''
399 try:
400 cluster_id = 'invalidtest'
401 self._nfs_cmd('export', 'create', 'cephfs', self.fs_name, cluster_id, self.pseudo_path)
402 self.fail(f"Export created with non-existing cluster id {cluster_id}")
403 except CommandFailedError as e:
404 # Command should fail for test to pass
405 if e.exitstatus != errno.ENOENT:
406 raise
407
408 def test_export_create_with_relative_pseudo_path_and_root_directory(self):
409 '''
410 Test creating cephfs export with relative or '/' pseudo path.
411 '''
412 def check_pseudo_path(pseudo_path):
413 try:
414 self._nfs_cmd('export', 'create', 'cephfs', self.fs_name, self.cluster_id,
415 pseudo_path)
416 self.fail(f"Export created for {pseudo_path}")
417 except CommandFailedError as e:
418 # Command should fail for test to pass
419 if e.exitstatus != errno.EINVAL:
420 raise
421
422 self._test_create_cluster()
423 self._cmd('fs', 'volume', 'create', self.fs_name)
424 check_pseudo_path('invalidpath')
425 check_pseudo_path('/')
426 check_pseudo_path('//')
427 self._cmd('fs', 'volume', 'rm', self.fs_name, '--yes-i-really-mean-it')
428 self._test_delete_cluster()
429
430 def test_write_to_read_only_export(self):
431 '''
432 Test write to readonly export.
433 '''
434 self._test_create_cluster()
435 self._create_export(export_id='1', create_fs=True, extra_cmd=[self.pseudo_path, '--readonly'])
436 port, ip = self._get_port_ip_info()
437 self._write_to_read_only_export(self.pseudo_path, port, ip)
438 self._test_delete_cluster()
439
440 def test_cluster_info(self):
441 '''
442 Test cluster info outputs correct ip and hostname
443 '''
444 self._test_create_cluster()
445 info_output = json.loads(self._nfs_cmd('cluster', 'info', self.cluster_id))
446 print(f'info {info_output}')
447 info_ip = info_output[self.cluster_id].get('backend', [])[0].pop("ip")
448 host_details = {
449 self.cluster_id: {
450 'backend': [
451 {
452 "hostname": self._sys_cmd(['hostname']).decode("utf-8").strip(),
453 "port": 2049
454 }
455 ],
456 "virtual_ip": None,
457 }
458 }
459 host_ip = self._sys_cmd(['hostname', '-I']).decode("utf-8").split()
460 print(f'host_ip is {host_ip}, info_ip is {info_ip}')
461 self.assertDictEqual(info_output, host_details)
462 self.assertTrue(info_ip in host_ip)
463 self._test_delete_cluster()
464
465 def test_cluster_set_reset_user_config(self):
466 '''
467 Test cluster is created using user config and reverts back to default
468 config on reset.
469 '''
470 self._test_create_cluster()
471
472 pool = 'nfs-ganesha'
473 user_id = 'test'
474 fs_name = 'user_test_fs'
475 pseudo_path = '/ceph'
476 self._cmd('fs', 'volume', 'create', fs_name)
477 time.sleep(20)
478 key = self._cmd('auth', 'get-or-create-key', f'client.{user_id}', 'mon',
479 'allow r', 'osd',
480 f'allow rw pool={pool} namespace={self.cluster_id}, allow rw tag cephfs data={fs_name}',
481 'mds', f'allow rw path={self.path}').strip()
482 config = f""" LOG {{
483 Default_log_level = FULL_DEBUG;
484 }}
485
486 EXPORT {{
487 Export_Id = 100;
488 Transports = TCP;
489 Path = /;
490 Pseudo = {pseudo_path};
491 Protocols = 4;
492 Access_Type = RW;
493 Attr_Expiration_Time = 0;
494 Squash = None;
495 FSAL {{
496 Name = CEPH;
497 Filesystem = {fs_name};
498 User_Id = {user_id};
499 Secret_Access_Key = '{key}';
500 }}
501 }}"""
502 port, ip = self._get_port_ip_info()
503 self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'cluster', 'config',
504 'set', self.cluster_id, '-i', '-'], stdin=config)
505 time.sleep(30)
506 res = self._sys_cmd(['rados', '-p', pool, '-N', self.cluster_id, 'get',
507 f'userconf-nfs.{user_id}', '-'])
508 self.assertEqual(config, res.decode('utf-8'))
509 self._test_mnt(pseudo_path, port, ip)
510 self._nfs_cmd('cluster', 'config', 'reset', self.cluster_id)
511 rados_obj_ls = self._sys_cmd(['rados', '-p', 'nfs-ganesha', '-N', self.cluster_id, 'ls'])
512 if b'conf-nfs' not in rados_obj_ls and b'userconf-nfs' in rados_obj_ls:
513 self.fail("User config not deleted")
514 time.sleep(30)
515 self._test_mnt(pseudo_path, port, ip, False)
516 self._cmd('fs', 'volume', 'rm', fs_name, '--yes-i-really-mean-it')
517 self._test_delete_cluster()
518
519 def test_cluster_set_user_config_with_non_existing_clusterid(self):
520 '''
521 Test setting user config for non-existing nfs cluster.
522 '''
523 try:
524 cluster_id = 'invalidtest'
525 self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'cluster',
526 'config', 'set', self.cluster_id, '-i', '-'], stdin='testing')
527 self.fail(f"User config set for non-existing cluster {cluster_id}")
528 except CommandFailedError as e:
529 # Command should fail for test to pass
530 if e.exitstatus != errno.ENOENT:
531 raise
532
533 def test_cluster_reset_user_config_with_non_existing_clusterid(self):
534 '''
535 Test resetting user config for non-existing nfs cluster.
536 '''
537 try:
538 cluster_id = 'invalidtest'
539 self._nfs_cmd('cluster', 'config', 'reset', cluster_id)
540 self.fail(f"User config reset for non-existing cluster {cluster_id}")
541 except CommandFailedError as e:
542 # Command should fail for test to pass
543 if e.exitstatus != errno.ENOENT:
544 raise
545
546 def test_update_export(self):
547 '''
548 Test update of exports
549 '''
550 self._create_default_export()
551 port, ip = self._get_port_ip_info()
552 self._test_mnt(self.pseudo_path, port, ip)
553 export_block = self._get_export()
554 new_pseudo_path = '/testing'
555 export_block['pseudo'] = new_pseudo_path
556 export_block['access_type'] = 'RO'
557 self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'export', 'update', '-i', '-'],
558 stdin=json.dumps(export_block))
559 self._check_nfs_cluster_status('running', 'NFS Ganesha cluster restart failed')
560 self._write_to_read_only_export(new_pseudo_path, port, ip)
561 self._test_delete_cluster()
562
563 def test_update_export_with_invalid_values(self):
564 '''
565 Test update of export with invalid values
566 '''
567 self._create_default_export()
568 export_block = self._get_export()
569
570 def update_with_invalid_values(key, value, fsal=False):
571 export_block_new = dict(export_block)
572 if fsal:
573 export_block_new['fsal'] = dict(export_block['fsal'])
574 export_block_new['fsal'][key] = value
575 else:
576 export_block_new[key] = value
577 try:
578 self.ctx.cluster.run(args=['sudo', 'ceph', 'nfs', 'export', 'update', '-i', '-'],
579 stdin=json.dumps(export_block_new))
580 except CommandFailedError:
581 pass
582
583 update_with_invalid_values('export_id', 9)
584 update_with_invalid_values('cluster_id', 'testing_new')
585 update_with_invalid_values('pseudo', 'test_relpath')
586 update_with_invalid_values('access_type', 'W')
587 update_with_invalid_values('squash', 'no_squash')
588 update_with_invalid_values('security_label', 'invalid')
589 update_with_invalid_values('protocols', [2])
590 update_with_invalid_values('transports', ['UD'])
591 update_with_invalid_values('name', 'RGW', True)
592 update_with_invalid_values('user_id', 'testing_export', True)
593 update_with_invalid_values('fs_name', 'b', True)
594 self._test_delete_cluster()
595
596 def test_cmds_without_reqd_args(self):
597 '''
598 Test that cmd fails on not passing required arguments
599 '''
600 def exec_cmd_invalid(*cmd):
601 try:
602 self._nfs_cmd(*cmd)
603 self.fail(f"nfs {cmd} command executed successfully without required arguments")
604 except CommandFailedError as e:
605 # Command should fail for test to pass
606 if e.exitstatus != errno.EINVAL:
607 raise
608
609 exec_cmd_invalid('cluster', 'create')
610 exec_cmd_invalid('cluster', 'delete')
611 exec_cmd_invalid('cluster', 'config', 'set')
612 exec_cmd_invalid('cluster', 'config', 'reset')
613 exec_cmd_invalid('export', 'create', 'cephfs')
614 exec_cmd_invalid('export', 'create', 'cephfs', 'a_fs')
615 exec_cmd_invalid('export', 'create', 'cephfs', 'a_fs', 'clusterid')
616 exec_cmd_invalid('export', 'ls')
617 exec_cmd_invalid('export', 'delete')
618 exec_cmd_invalid('export', 'delete', 'clusterid')
619 exec_cmd_invalid('export', 'get')
620 exec_cmd_invalid('export', 'get', 'clusterid')