]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/test_multi.py
e32d00f0da31740cbeb584388c791ed4f803b5eb
[ceph.git] / ceph / src / test / rgw / test_multi.py
1 import subprocess
2 import os
3 import random
4 import string
5 import argparse
6 import sys
7 import logging
8 try:
9 import configparser
10 except ImportError:
11 import ConfigParser as configparser
12
13 import nose.core
14
15 from rgw_multi import multisite
16 from rgw_multi.zone_rados import RadosZone as RadosZone
17 from rgw_multi.zone_es import ESZone as ESZone
18 from rgw_multi.zone_es import ESZoneConfig as ESZoneConfig
19 from rgw_multi.zone_cloud import CloudZone as CloudZone
20 from rgw_multi.zone_cloud import CloudZoneConfig as CloudZoneConfig
21 from rgw_multi.zone_ps import PSZone as PSZone
22 from rgw_multi.zone_ps import PSZoneConfig as PSZoneConfig
23 from rgw_multi.zone_az import AZone as AZone
24 from rgw_multi.zone_az import AZoneConfig as AZoneConfig
25
26 # make tests from rgw_multi.tests available to nose
27 from rgw_multi.tests import *
28 from rgw_multi.tests_es import *
29 from rgw_multi.tests_ps import *
30 from rgw_multi.tests_az import *
31
32 mstart_path = os.getenv('MSTART_PATH')
33 if mstart_path is None:
34 mstart_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__)) + '/../..') + '/'
35
36 test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/'
37
38 # configure logging for the tests module
39 log = logging.getLogger('rgw_multi.tests')
40
41 def bash(cmd, **kwargs):
42 log.debug('running cmd: %s', ' '.join(cmd))
43 check_retcode = kwargs.pop('check_retcode', True)
44 kwargs['stdout'] = subprocess.PIPE
45 process = subprocess.Popen(cmd, **kwargs)
46 s = process.communicate()[0].decode('utf-8')
47 log.debug('command returned status=%d stdout=%s', process.returncode, s)
48 if check_retcode:
49 assert(process.returncode == 0)
50 return (s, process.returncode)
51
52 class Cluster(multisite.Cluster):
53 """ cluster implementation based on mstart/mrun scripts """
54 def __init__(self, cluster_id):
55 super(Cluster, self).__init__()
56 self.cluster_id = cluster_id
57 self.needs_reset = True
58
59 def admin(self, args = None, **kwargs):
60 """ radosgw-admin command """
61 cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', self.cluster_id]
62 if args:
63 cmd += args
64 cmd += ['--debug-rgw=' + str(kwargs.pop('debug_rgw', 0))]
65 cmd += ['--debug-ms=' + str(kwargs.pop('debug_ms', 0))]
66 if kwargs.pop('read_only', False):
67 cmd += ['--rgw-cache-enabled=false']
68 return bash(cmd, **kwargs)
69
70 def start(self):
71 cmd = [mstart_path + 'mstart.sh', self.cluster_id]
72 env = None
73 if self.needs_reset:
74 env = os.environ.copy()
75 env['CEPH_NUM_MDS'] = '0'
76 cmd += ['-n']
77 # cmd += ['-o']
78 # cmd += ['rgw_cache_enabled=false']
79 bash(cmd, env=env)
80 self.needs_reset = False
81
82 def stop(self):
83 cmd = [mstart_path + 'mstop.sh', self.cluster_id]
84 bash(cmd)
85
86 class Gateway(multisite.Gateway):
87 """ gateway implementation based on mrgw/mstop scripts """
88 def __init__(self, client_id = None, *args, **kwargs):
89 super(Gateway, self).__init__(*args, **kwargs)
90 self.id = client_id
91
92 def start(self, args = None):
93 """ start the gateway """
94 assert(self.cluster)
95 env = os.environ.copy()
96 # to change frontend, set RGW_FRONTEND env variable
97 # e.g. RGW_FRONTEND=civetweb
98 # to run test under valgrind memcheck, set RGW_VALGRIND to 'yes'
99 # e.g. RGW_VALGRIND=yes
100 cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port), str(self.ssl_port)]
101 if self.id:
102 cmd += ['-i', self.id]
103 cmd += ['--debug-rgw=20', '--debug-ms=1']
104 if args:
105 cmd += args
106 bash(cmd, env=env)
107
108 def stop(self):
109 """ stop the gateway """
110 assert(self.cluster)
111 cmd = [mstart_path + 'mstop.sh', self.cluster.cluster_id, 'radosgw', str(self.port)]
112 bash(cmd)
113
114 def gen_access_key():
115 return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(16))
116
117 def gen_secret():
118 return ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(32))
119
120 def gen_credentials():
121 return multisite.Credentials(gen_access_key(), gen_secret())
122
123 def cluster_name(cluster_num):
124 return 'c' + str(cluster_num)
125
126 def zonegroup_name(zonegroup_num):
127 return string.ascii_lowercase[zonegroup_num]
128
129 def zone_name(zonegroup_num, zone_num):
130 return zonegroup_name(zonegroup_num) + str(zone_num + 1)
131
132 def gateway_port(zonegroup_num, gateway_num):
133 return 8000 + 100 * zonegroup_num + gateway_num
134
135 def gateway_name(zonegroup_num, zone_num, gateway_num):
136 return zone_name(zonegroup_num, zone_num) + '-' + str(gateway_num + 1)
137
138 def zone_endpoints(zonegroup_num, zone_num, gateways_per_zone):
139 endpoints = []
140 base = gateway_port(zonegroup_num, zone_num * gateways_per_zone)
141 for i in range(0, gateways_per_zone):
142 endpoints.append('http://localhost:' + str(base + i))
143 return endpoints
144
145 def get_log_level(log_level):
146 if log_level >= 20:
147 return logging.DEBUG
148 if log_level >= 10:
149 return logging.INFO
150 if log_level >= 5:
151 return logging.WARN
152 if log_level >= 1:
153 return logging.ERROR
154 return logging.CRITICAL
155
156 def setup_logging(log_level_console, log_file, log_level_file):
157 if log_file:
158 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
159 fh = logging.FileHandler(log_file)
160 fh.setFormatter(formatter)
161 fh.setLevel(get_log_level(log_level_file))
162 log.addHandler(fh)
163
164 formatter = logging.Formatter('%(levelname)s %(message)s')
165 ch = logging.StreamHandler()
166 ch.setFormatter(formatter)
167 ch.setLevel(get_log_level(log_level_console))
168 log.addHandler(ch)
169 log.setLevel(get_log_level(log_level_console))
170
171 def init(parse_args):
172 cfg = configparser.RawConfigParser({
173 'num_zonegroups': 1,
174 'num_zones': 3,
175 'num_ps_zones': 0,
176 'num_az_zones': 0,
177 'gateways_per_zone': 2,
178 'no_bootstrap': 'false',
179 'log_level': 20,
180 'log_file': None,
181 'file_log_level': 20,
182 'tenant': None,
183 'checkpoint_retries': 60,
184 'checkpoint_delay': 5,
185 'reconfigure_delay': 5,
186 'use_ssl': 'false',
187 })
188 try:
189 path = os.environ['RGW_MULTI_TEST_CONF']
190 except KeyError:
191 path = test_path + 'test_multi.conf'
192
193 try:
194 with open(path) as f:
195 cfg.readfp(f)
196 except:
197 print('WARNING: error reading test config. Path can be set through the RGW_MULTI_TEST_CONF env variable')
198 pass
199
200 parser = argparse.ArgumentParser(
201 description='Run rgw multi-site tests',
202 usage='test_multi [--num-zonegroups <num>] [--num-zones <num>] [--no-bootstrap]')
203
204 section = 'DEFAULT'
205 parser.add_argument('--num-zonegroups', type=int, default=cfg.getint(section, 'num_zonegroups'))
206 parser.add_argument('--num-zones', type=int, default=cfg.getint(section, 'num_zones'))
207 parser.add_argument('--gateways-per-zone', type=int, default=cfg.getint(section, 'gateways_per_zone'))
208 parser.add_argument('--no-bootstrap', action='store_true', default=cfg.getboolean(section, 'no_bootstrap'))
209 parser.add_argument('--log-level', type=int, default=cfg.getint(section, 'log_level'))
210 parser.add_argument('--log-file', type=str, default=cfg.get(section, 'log_file'))
211 parser.add_argument('--file-log-level', type=int, default=cfg.getint(section, 'file_log_level'))
212 parser.add_argument('--tenant', type=str, default=cfg.get(section, 'tenant'))
213 parser.add_argument('--checkpoint-retries', type=int, default=cfg.getint(section, 'checkpoint_retries'))
214 parser.add_argument('--checkpoint-delay', type=int, default=cfg.getint(section, 'checkpoint_delay'))
215 parser.add_argument('--reconfigure-delay', type=int, default=cfg.getint(section, 'reconfigure_delay'))
216 parser.add_argument('--num-ps-zones', type=int, default=cfg.getint(section, 'num_ps_zones'))
217 parser.add_argument('--use-ssl', type=bool, default=cfg.getboolean(section, 'use_ssl'))
218
219
220 es_cfg = []
221 cloud_cfg = []
222 ps_cfg = []
223 az_cfg = []
224
225 for s in cfg.sections():
226 if s.startswith('elasticsearch'):
227 es_cfg.append(ESZoneConfig(cfg, s))
228 elif s.startswith('cloud'):
229 cloud_cfg.append(CloudZoneConfig(cfg, s))
230 elif s.startswith('pubsub'):
231 ps_cfg.append(PSZoneConfig(cfg, s))
232 elif s.startswith('archive'):
233 az_cfg.append(AZoneConfig(cfg, s))
234
235
236 argv = []
237
238 if parse_args:
239 argv = sys.argv[1:]
240
241 args = parser.parse_args(argv)
242 bootstrap = not args.no_bootstrap
243
244 setup_logging(args.log_level, args.log_file, args.file_log_level)
245
246 # start first cluster
247 c1 = Cluster(cluster_name(1))
248 if bootstrap:
249 c1.start()
250 clusters = []
251 clusters.append(c1)
252
253 admin_creds = gen_credentials()
254 admin_user = multisite.User('zone.user')
255
256 user_creds = gen_credentials()
257 user = multisite.User('tester', tenant=args.tenant)
258
259 realm = multisite.Realm('r')
260 if bootstrap:
261 # create the realm on c1
262 realm.create(c1)
263 else:
264 realm.get(c1)
265 period = multisite.Period(realm=realm)
266 realm.current_period = period
267
268 num_es_zones = len(es_cfg)
269 num_cloud_zones = len(cloud_cfg)
270 num_ps_zones_from_conf = len(ps_cfg)
271 num_az_zones = cfg.getint(section, 'num_az_zones')
272
273 num_ps_zones = args.num_ps_zones if num_ps_zones_from_conf == 0 else num_ps_zones_from_conf
274
275 num_zones = args.num_zones + num_es_zones + num_cloud_zones + num_ps_zones + num_az_zones
276
277 use_ssl = cfg.getboolean(section, 'use_ssl')
278
279 if use_ssl and bootstrap:
280 cmd = ['openssl', 'req',
281 '-x509',
282 '-newkey', 'rsa:4096',
283 '-sha256',
284 '-nodes',
285 '-keyout', 'key.pem',
286 '-out', 'cert.pem',
287 '-subj', '/CN=localhost',
288 '-days', '3650']
289 bash(cmd)
290 # append key to cert
291 fkey = open('./key.pem', 'r')
292 if fkey.mode == 'r':
293 fcert = open('./cert.pem', 'a')
294 fcert.write(fkey.read())
295 fcert.close()
296 fkey.close()
297
298 for zg in range(0, args.num_zonegroups):
299 zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period)
300 period.zonegroups.append(zonegroup)
301
302 is_master_zg = zg == 0
303 if is_master_zg:
304 period.master_zonegroup = zonegroup
305
306 for z in range(0, num_zones):
307 is_master = z == 0
308 # start a cluster, or use c1 for first zone
309 cluster = None
310 if is_master_zg and is_master:
311 cluster = c1
312 else:
313 cluster = Cluster(cluster_name(len(clusters) + 1))
314 clusters.append(cluster)
315 if bootstrap:
316 cluster.start()
317 # pull realm configuration from the master's gateway
318 gateway = realm.meta_master_zone().gateways[0]
319 realm.pull(cluster, gateway, admin_creds)
320
321 endpoints = zone_endpoints(zg, z, args.gateways_per_zone)
322 if is_master:
323 if bootstrap:
324 # create the zonegroup on its first zone's cluster
325 arg = []
326 if is_master_zg:
327 arg += ['--master']
328 if len(endpoints): # use master zone's endpoints
329 arg += ['--endpoints', ','.join(endpoints)]
330 zonegroup.create(cluster, arg)
331 else:
332 zonegroup.get(cluster)
333
334 es_zone = (z >= args.num_zones and z < args.num_zones + num_es_zones)
335 cloud_zone = (z >= args.num_zones + num_es_zones and z < args.num_zones + num_es_zones + num_cloud_zones)
336 ps_zone = (z >= args.num_zones + num_es_zones + num_cloud_zones and z < args.num_zones + num_es_zones + num_cloud_zones + num_ps_zones)
337 az_zone = (z >= args.num_zones + num_es_zones + num_cloud_zones + num_ps_zones)
338
339 # create the zone in its zonegroup
340 zone = multisite.Zone(zone_name(zg, z), zonegroup, cluster)
341 if es_zone:
342 zone_index = z - args.num_zones
343 zone = ESZone(zone_name(zg, z), es_cfg[zone_index].endpoint, zonegroup, cluster)
344 elif cloud_zone:
345 zone_index = z - args.num_zones - num_es_zones
346 ccfg = cloud_cfg[zone_index]
347 zone = CloudZone(zone_name(zg, z), ccfg.endpoint, ccfg.credentials, ccfg.source_bucket,
348 ccfg.target_path, zonegroup, cluster)
349 elif ps_zone:
350 zone_index = z - args.num_zones - num_es_zones - num_cloud_zones
351 if num_ps_zones_from_conf == 0:
352 zone = PSZone(zone_name(zg, z), zonegroup, cluster)
353 else:
354 pscfg = ps_cfg[zone_index]
355 zone = PSZone(zone_name(zg, z), zonegroup, cluster,
356 full_sync=pscfg.full_sync, retention_days=pscfg.retention_days)
357 elif az_zone:
358 zone_index = z - args.num_zones - num_es_zones - num_cloud_zones - num_ps_zones
359 zone = AZone(zone_name(zg, z), zonegroup, cluster)
360 else:
361 zone = RadosZone(zone_name(zg, z), zonegroup, cluster)
362
363 if bootstrap:
364 arg = admin_creds.credential_args()
365 if is_master:
366 arg += ['--master']
367 if len(endpoints):
368 arg += ['--endpoints', ','.join(endpoints)]
369 zone.create(cluster, arg)
370 else:
371 zone.get(cluster)
372 zonegroup.zones.append(zone)
373 if is_master:
374 zonegroup.master_zone = zone
375
376 zonegroup.zones_by_type.setdefault(zone.tier_type(), []).append(zone)
377
378 if zone.is_read_only():
379 zonegroup.ro_zones.append(zone)
380 else:
381 zonegroup.rw_zones.append(zone)
382
383 # update/commit the period
384 if bootstrap:
385 period.update(zone, commit=True)
386
387 ssl_port_offset = 1000
388 # start the gateways
389 for g in range(0, args.gateways_per_zone):
390 port = gateway_port(zg, g + z * args.gateways_per_zone)
391 client_id = gateway_name(zg, z, g)
392 gateway = Gateway(client_id, 'localhost', port, cluster, zone,
393 ssl_port = port+ssl_port_offset if use_ssl else 0)
394 if bootstrap:
395 gateway.start()
396 zone.gateways.append(gateway)
397
398 if is_master_zg and is_master:
399 if bootstrap:
400 # create admin user
401 arg = ['--display-name', '"Zone User"', '--system']
402 arg += admin_creds.credential_args()
403 admin_user.create(zone, arg)
404 # create test user
405 arg = ['--display-name', '"Test User"']
406 arg += user_creds.credential_args()
407 user.create(zone, arg)
408 else:
409 # read users and update keys
410 admin_user.info(zone)
411 admin_creds = admin_user.credentials[0]
412 arg = []
413 user.info(zone, arg)
414 user_creds = user.credentials[0]
415
416 if not bootstrap:
417 period.get(c1)
418
419 config = Config(checkpoint_retries=args.checkpoint_retries,
420 checkpoint_delay=args.checkpoint_delay,
421 reconfigure_delay=args.reconfigure_delay,
422 tenant=args.tenant)
423 init_multi(realm, user, config)
424
425 def setup_module():
426 init(False)
427
428 if __name__ == "__main__":
429 init(True)
430