]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/rgw/rgw_multi/tests.py
update sources to v12.2.3
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests.py
CommitLineData
7c673cae
FG
1import json
2import random
3import string
4import sys
5import time
6import logging
31f18b77 7
7c673cae
FG
8try:
9 from itertools import izip_longest as zip_longest
10except ImportError:
11 from itertools import zip_longest
12from itertools import combinations
3efd9988 13from cStringIO import StringIO
7c673cae
FG
14
15import boto
16import boto.s3.connection
31f18b77
FG
17from boto.s3.website import WebsiteConfiguration
18from boto.s3.cors import CORSConfiguration
7c673cae
FG
19
20from nose.tools import eq_ as eq
21from nose.plugins.attrib import attr
22from nose.plugins.skip import SkipTest
23
24from .multisite import Zone
25
31f18b77
FG
26from .conn import get_gateway_connection
27
28class Config:
29 """ test configuration """
30 def __init__(self, **kwargs):
31 # by default, wait up to 5 minutes before giving up on a sync checkpoint
32 self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
33 self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
34 # allow some time for realm reconfiguration after changing master zone
35 self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
36
7c673cae
FG
37# rgw multisite tests, written against the interfaces provided in rgw_multi.
38# these tests must be initialized and run by another module that provides
39# implementations of these interfaces by calling init_multi()
40realm = None
41user = None
31f18b77
FG
42config = None
43def init_multi(_realm, _user, _config=None):
7c673cae
FG
44 global realm
45 realm = _realm
46 global user
47 user = _user
31f18b77
FG
48 global config
49 config = _config or Config()
50 realm_meta_checkpoint(realm)
51
52def get_realm():
53 return realm
7c673cae
FG
54
55log = logging.getLogger(__name__)
56
57num_buckets = 0
58run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
59
60def get_gateway_connection(gateway, credentials):
61 """ connect to the given gateway """
62 if gateway.connection is None:
63 gateway.connection = boto.connect_s3(
64 aws_access_key_id = credentials.access_key,
65 aws_secret_access_key = credentials.secret,
66 host = gateway.host,
67 port = gateway.port,
68 is_secure = False,
69 calling_format = boto.s3.connection.OrdinaryCallingFormat())
70 return gateway.connection
71
72def get_zone_connection(zone, credentials):
73 """ connect to the zone's first gateway """
74 if isinstance(credentials, list):
75 credentials = credentials[0]
76 return get_gateway_connection(zone.gateways[0], credentials)
77
78def mdlog_list(zone, period = None):
79 cmd = ['mdlog', 'list']
80 if period:
81 cmd += ['--period', period]
82 (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
83 mdlog_json = mdlog_json.decode('utf-8')
84 return json.loads(mdlog_json)
85
7c673cae
FG
86def meta_sync_status(zone):
87 while True:
88 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
89 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
90 if retcode == 0:
91 break
92 assert(retcode == 2) # ENOENT
93 time.sleep(5)
94
31f18b77
FG
95def mdlog_autotrim(zone):
96 zone.cluster.admin(['mdlog', 'autotrim'])
97
b32b8144
FG
98def bilog_list(zone, bucket, args = None):
99 cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
100 bilog, _ = zone.cluster.admin(cmd, read_only=True)
101 bilog = bilog.decode('utf-8')
102 return json.loads(bilog)
103
104def bilog_autotrim(zone, args = None):
105 zone.cluster.admin(['bilog', 'autotrim'] + (args or []))
106
31f18b77 107def parse_meta_sync_status(meta_sync_status_json):
7c673cae
FG
108 meta_sync_status_json = meta_sync_status_json.decode('utf-8')
109 log.debug('current meta sync status=%s', meta_sync_status_json)
110 sync_status = json.loads(meta_sync_status_json)
111
112 sync_info = sync_status['sync_status']['info']
113 global_sync_status = sync_info['status']
114 num_shards = sync_info['num_shards']
115 period = sync_info['period']
116 realm_epoch = sync_info['realm_epoch']
117
118 sync_markers=sync_status['sync_status']['markers']
119 log.debug('sync_markers=%s', sync_markers)
120 assert(num_shards == len(sync_markers))
121
122 markers={}
123 for i in range(num_shards):
124 # get marker, only if it's an incremental marker for the same realm epoch
125 if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
126 markers[i] = ''
127 else:
128 markers[i] = sync_markers[i]['val']['marker']
129
130 return period, realm_epoch, num_shards, markers
131
31f18b77
FG
132def meta_sync_status(zone):
133 for _ in range(config.checkpoint_retries):
134 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
135 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
136 if retcode == 0:
137 return parse_meta_sync_status(meta_sync_status_json)
138 assert(retcode == 2) # ENOENT
139 time.sleep(config.checkpoint_delay)
140
141 assert False, 'failed to read metadata sync status for zone=%s' % zone.name
142
7c673cae
FG
143def meta_master_log_status(master_zone):
144 cmd = ['mdlog', 'status'] + master_zone.zone_args()
145 mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
146 mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
147
148 markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
149 log.debug('master meta markers=%s', markers)
150 return markers
151
152def compare_meta_status(zone, log_status, sync_status):
153 if len(log_status) != len(sync_status):
154 log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
155 return False
156
157 msg = ''
158 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
159 if l > s:
160 if len(msg):
161 msg += ', '
162 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
163
164 if len(msg) > 0:
165 log.warning('zone %s behind master: %s', zone.name, msg)
166 return False
167
168 return True
169
170def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
171 if not meta_master_zone:
172 meta_master_zone = zone.realm().meta_master_zone()
173 if not master_status:
174 master_status = meta_master_log_status(meta_master_zone)
175
176 current_realm_epoch = realm.current_period.data['realm_epoch']
177
178 log.info('starting meta checkpoint for zone=%s', zone.name)
179
31f18b77 180 for _ in range(config.checkpoint_retries):
7c673cae
FG
181 period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
182 if realm_epoch < current_realm_epoch:
183 log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
184 zone.name, realm_epoch, current_realm_epoch)
185 else:
186 log.debug('log_status=%s', master_status)
187 log.debug('sync_status=%s', sync_status)
188 if compare_meta_status(zone, master_status, sync_status):
31f18b77
FG
189 log.info('finish meta checkpoint for zone=%s', zone.name)
190 return
7c673cae 191
31f18b77
FG
192 time.sleep(config.checkpoint_delay)
193 assert False, 'failed meta checkpoint for zone=%s' % zone.name
7c673cae
FG
194
195def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
196 if not meta_master_zone:
197 meta_master_zone = zonegroup.realm().meta_master_zone()
198 if not master_status:
199 master_status = meta_master_log_status(meta_master_zone)
200
201 for zone in zonegroup.zones:
202 if zone == meta_master_zone:
203 continue
204 zone_meta_checkpoint(zone, meta_master_zone, master_status)
205
206def realm_meta_checkpoint(realm):
207 log.info('meta checkpoint')
208
209 meta_master_zone = realm.meta_master_zone()
210 master_status = meta_master_log_status(meta_master_zone)
211
212 for zonegroup in realm.current_period.zonegroups:
213 zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
214
31f18b77 215def parse_data_sync_status(data_sync_status_json):
7c673cae
FG
216 data_sync_status_json = data_sync_status_json.decode('utf-8')
217 log.debug('current data sync status=%s', data_sync_status_json)
218 sync_status = json.loads(data_sync_status_json)
219
220 global_sync_status=sync_status['sync_status']['info']['status']
221 num_shards=sync_status['sync_status']['info']['num_shards']
222
223 sync_markers=sync_status['sync_status']['markers']
224 log.debug('sync_markers=%s', sync_markers)
225 assert(num_shards == len(sync_markers))
226
227 markers={}
228 for i in range(num_shards):
229 markers[i] = sync_markers[i]['val']['marker']
230
231 return (num_shards, markers)
232
31f18b77
FG
233def data_sync_status(target_zone, source_zone):
234 if target_zone == source_zone:
235 return None
236
237 for _ in range(config.checkpoint_retries):
238 cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
239 cmd += ['--source-zone', source_zone.name]
240 data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
241 if retcode == 0:
242 return parse_data_sync_status(data_sync_status_json)
243
244 assert(retcode == 2) # ENOENT
245 time.sleep(config.checkpoint_delay)
246
247 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
248 (target_zone.name, source_zone.name)
249
7c673cae
FG
250def bucket_sync_status(target_zone, source_zone, bucket_name):
251 if target_zone == source_zone:
252 return None
253
254 cmd = ['bucket', 'sync', 'status'] + target_zone.zone_args()
255 cmd += ['--source-zone', source_zone.name]
256 cmd += ['--bucket', bucket_name]
257 while True:
258 bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
259 if retcode == 0:
260 break
261
262 assert(retcode == 2) # ENOENT
263
264 bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
265 log.debug('current bucket sync status=%s', bucket_sync_status_json)
266 sync_status = json.loads(bucket_sync_status_json)
267
268 markers={}
269 for entry in sync_status:
270 val = entry['val']
271 if val['status'] == 'incremental-sync':
272 pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
273 else:
274 pos = ''
275 markers[entry['key']] = pos
276
277 return markers
278
279def data_source_log_status(source_zone):
280 source_cluster = source_zone.cluster
281 cmd = ['datalog', 'status'] + source_zone.zone_args()
282 datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True)
283 datalog_status = json.loads(datalog_status_json.decode('utf-8'))
284
285 markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
286 log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
287 return markers
288
289def bucket_source_log_status(source_zone, bucket_name):
290 cmd = ['bilog', 'status'] + source_zone.zone_args()
291 cmd += ['--bucket', bucket_name]
292 source_cluster = source_zone.cluster
293 bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
294 bilog_status = json.loads(bilog_status_json.decode('utf-8'))
295
296 m={}
297 markers={}
298 try:
299 m = bilog_status['markers']
300 except:
301 pass
302
303 for s in m:
304 key = s['key']
305 val = s['val']
306 markers[key] = val
307
308 log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
309 return markers
310
311def compare_data_status(target_zone, source_zone, log_status, sync_status):
312 if len(log_status) != len(sync_status):
313 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
314 return False
315
316 msg = ''
317 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
318 if l > s:
319 if len(msg):
320 msg += ', '
321 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
322
323 if len(msg) > 0:
324 log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
325 return False
326
327 return True
328
329def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
330 if len(log_status) != len(sync_status):
331 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
332 return False
333
334 msg = ''
335 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
336 if l > s:
337 if len(msg):
338 msg += ', '
339 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
340
341 if len(msg) > 0:
342 log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
343 return False
344
345 return True
346
31f18b77 347def zone_data_checkpoint(target_zone, source_zone_conn):
7c673cae
FG
348 if target_zone == source_zone:
349 return
350
31f18b77 351 log_status = data_source_log_status(source_zone)
7c673cae
FG
352 log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
353
31f18b77 354 for _ in range(config.checkpoint_retries):
7c673cae
FG
355 num_shards, sync_status = data_sync_status(target_zone, source_zone)
356
357 log.debug('log_status=%s', log_status)
358 log.debug('sync_status=%s', sync_status)
359
360 if compare_data_status(target_zone, source_zone, log_status, sync_status):
31f18b77
FG
361 log.info('finished data checkpoint for target_zone=%s source_zone=%s',
362 target_zone.name, source_zone.name)
363 return
364 time.sleep(config.checkpoint_delay)
7c673cae 365
31f18b77
FG
366 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
367 (target_zone.name, source_zone.name)
7c673cae 368
7c673cae
FG
369
370def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
371 if target_zone == source_zone:
372 return
373
31f18b77 374 log_status = bucket_source_log_status(source_zone, bucket_name)
7c673cae
FG
375 log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
376
31f18b77 377 for _ in range(config.checkpoint_retries):
7c673cae
FG
378 sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
379
380 log.debug('log_status=%s', log_status)
381 log.debug('sync_status=%s', sync_status)
382
383 if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
31f18b77
FG
384 log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
385 return
7c673cae 386
31f18b77 387 time.sleep(config.checkpoint_delay)
7c673cae 388
31f18b77
FG
389 assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
390 (target_zone.name, source_zone.name, bucket_name)
7c673cae 391
c07f9fc5
FG
392def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
393 for source_conn in zonegroup_conns.zones:
394 for target_conn in zonegroup_conns.zones:
395 if source_conn.zone == target_conn.zone:
396 continue
397 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
398 target_conn.check_bucket_eq(source_conn, bucket_name)
399
7c673cae
FG
400def set_master_zone(zone):
401 zone.modify(zone.cluster, ['--master'])
402 zonegroup = zone.zonegroup
403 zonegroup.period.update(zone, commit=True)
404 zonegroup.master_zone = zone
31f18b77
FG
405 log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
406 time.sleep(config.reconfigure_delay)
7c673cae 407
c07f9fc5
FG
408def enable_bucket_sync(zone, bucket_name):
409 cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
410 zone.cluster.admin(cmd)
411
412def disable_bucket_sync(zone, bucket_name):
413 cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
414 zone.cluster.admin(cmd)
415
416def check_buckets_sync_status_obj_not_exist(zone, buckets):
417 for _ in range(config.checkpoint_retries):
418 cmd = ['log', 'list'] + zone.zone_arg()
419 log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
420 for bucket in buckets:
421 if log_list.find(':'+bucket+":") >= 0:
422 break
423 else:
424 return
425 time.sleep(config.checkpoint_delay)
426 assert False
427
7c673cae
FG
428def gen_bucket_name():
429 global num_buckets
430
431 num_buckets += 1
432 return run_prefix + '-' + str(num_buckets)
433
31f18b77
FG
434class ZonegroupConns:
435 def __init__(self, zonegroup):
436 self.zonegroup = zonegroup
437 self.zones = []
438 self.ro_zones = []
439 self.rw_zones = []
440 self.master_zone = None
441 for z in zonegroup.zones:
442 zone_conn = z.get_conn(user.credentials)
443 self.zones.append(zone_conn)
444 if z.is_read_only():
445 self.ro_zones.append(zone_conn)
446 else:
447 self.rw_zones.append(zone_conn)
448
449 if z == zonegroup.master_zone:
450 self.master_zone = zone_conn
451
452def check_all_buckets_exist(zone_conn, buckets):
453 if not zone_conn.zone.has_buckets():
454 return True
455
7c673cae
FG
456 for b in buckets:
457 try:
31f18b77 458 zone_conn.get_bucket(b)
7c673cae
FG
459 except:
460 log.critical('zone %s does not contain bucket %s', zone.name, b)
461 return False
462
463 return True
464
31f18b77
FG
465def check_all_buckets_dont_exist(zone_conn, buckets):
466 if not zone_conn.zone.has_buckets():
467 return True
468
7c673cae
FG
469 for b in buckets:
470 try:
31f18b77 471 zone_conn.get_bucket(b)
7c673cae
FG
472 except:
473 continue
474
475 log.critical('zone %s contains bucket %s', zone.zone, b)
476 return False
477
478 return True
479
31f18b77 480def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
7c673cae 481 buckets = []
31f18b77
FG
482 zone_bucket = []
483 for zone in zonegroup_conns.rw_zones:
484 for i in xrange(buckets_per_zone):
485 bucket_name = gen_bucket_name()
486 log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
487 bucket = zone.create_bucket(bucket_name)
488 buckets.append(bucket_name)
489 zone_bucket.append((zone, bucket))
7c673cae
FG
490
491 return buckets, zone_bucket
492
493def create_bucket_per_zone_in_realm():
494 buckets = []
31f18b77 495 zone_bucket = []
7c673cae 496 for zonegroup in realm.current_period.zonegroups:
31f18b77
FG
497 zg_conn = ZonegroupConns(zonegroup)
498 b, z = create_bucket_per_zone(zg_conn)
7c673cae 499 buckets.extend(b)
31f18b77 500 zone_bucket.extend(z)
7c673cae
FG
501 return buckets, zone_bucket
502
503def test_bucket_create():
504 zonegroup = realm.master_zonegroup()
31f18b77
FG
505 zonegroup_conns = ZonegroupConns(zonegroup)
506 buckets, _ = create_bucket_per_zone(zonegroup_conns)
7c673cae
FG
507 zonegroup_meta_checkpoint(zonegroup)
508
31f18b77 509 for zone in zonegroup_conns.zones:
7c673cae
FG
510 assert check_all_buckets_exist(zone, buckets)
511
512def test_bucket_recreate():
513 zonegroup = realm.master_zonegroup()
31f18b77
FG
514 zonegroup_conns = ZonegroupConns(zonegroup)
515 buckets, _ = create_bucket_per_zone(zonegroup_conns)
7c673cae
FG
516 zonegroup_meta_checkpoint(zonegroup)
517
31f18b77
FG
518
519 for zone in zonegroup_conns.zones:
7c673cae
FG
520 assert check_all_buckets_exist(zone, buckets)
521
522 # recreate buckets on all zones, make sure they weren't removed
31f18b77 523 for zone in zonegroup_conns.rw_zones:
7c673cae 524 for bucket_name in buckets:
31f18b77 525 bucket = zone.create_bucket(bucket_name)
7c673cae 526
31f18b77 527 for zone in zonegroup_conns.zones:
7c673cae
FG
528 assert check_all_buckets_exist(zone, buckets)
529
530 zonegroup_meta_checkpoint(zonegroup)
531
31f18b77 532 for zone in zonegroup_conns.zones:
7c673cae
FG
533 assert check_all_buckets_exist(zone, buckets)
534
535def test_bucket_remove():
536 zonegroup = realm.master_zonegroup()
31f18b77
FG
537 zonegroup_conns = ZonegroupConns(zonegroup)
538 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
7c673cae
FG
539 zonegroup_meta_checkpoint(zonegroup)
540
31f18b77 541 for zone in zonegroup_conns.zones:
7c673cae
FG
542 assert check_all_buckets_exist(zone, buckets)
543
31f18b77
FG
544 for zone, bucket_name in zone_bucket:
545 zone.conn.delete_bucket(bucket_name)
7c673cae
FG
546
547 zonegroup_meta_checkpoint(zonegroup)
548
31f18b77 549 for zone in zonegroup_conns.zones:
7c673cae
FG
550 assert check_all_buckets_dont_exist(zone, buckets)
551
552def get_bucket(zone, bucket_name):
31f18b77 553 return zone.conn.get_bucket(bucket_name)
7c673cae
FG
554
555def get_key(zone, bucket_name, obj_name):
556 b = get_bucket(zone, bucket_name)
557 return b.get_key(obj_name)
558
559def new_key(zone, bucket_name, obj_name):
560 b = get_bucket(zone, bucket_name)
561 return b.new_key(obj_name)
562
31f18b77
FG
563def check_bucket_eq(zone_conn1, zone_conn2, bucket):
564 return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
7c673cae
FG
565
566def test_object_sync():
567 zonegroup = realm.master_zonegroup()
31f18b77
FG
568 zonegroup_conns = ZonegroupConns(zonegroup)
569 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
7c673cae
FG
570
571 objnames = [ 'myobj', '_myobj', ':', '&' ]
572 content = 'asdasd'
573
574 # don't wait for meta sync just yet
31f18b77 575 for zone, bucket_name in zone_bucket:
7c673cae
FG
576 for objname in objnames:
577 k = new_key(zone, bucket_name, objname)
578 k.set_contents_from_string(content)
579
580 zonegroup_meta_checkpoint(zonegroup)
581
31f18b77
FG
582 for source_conn, bucket in zone_bucket:
583 for target_conn in zonegroup_conns.zones:
584 if source_conn.zone == target_conn.zone:
7c673cae
FG
585 continue
586
31f18b77
FG
587 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
588 check_bucket_eq(source_conn, target_conn, bucket)
7c673cae
FG
589
590def test_object_delete():
591 zonegroup = realm.master_zonegroup()
31f18b77
FG
592 zonegroup_conns = ZonegroupConns(zonegroup)
593 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
7c673cae
FG
594
595 objname = 'myobj'
596 content = 'asdasd'
597
598 # don't wait for meta sync just yet
31f18b77 599 for zone, bucket in zone_bucket:
7c673cae
FG
600 k = new_key(zone, bucket, objname)
601 k.set_contents_from_string(content)
602
603 zonegroup_meta_checkpoint(zonegroup)
604
605 # check object exists
31f18b77
FG
606 for source_conn, bucket in zone_bucket:
607 for target_conn in zonegroup_conns.zones:
608 if source_conn.zone == target_conn.zone:
7c673cae
FG
609 continue
610
31f18b77
FG
611 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
612 check_bucket_eq(source_conn, target_conn, bucket)
7c673cae
FG
613
614 # check object removal
31f18b77
FG
615 for source_conn, bucket in zone_bucket:
616 k = get_key(source_conn, bucket, objname)
7c673cae 617 k.delete()
31f18b77
FG
618 for target_conn in zonegroup_conns.zones:
619 if source_conn.zone == target_conn.zone:
7c673cae
FG
620 continue
621
31f18b77
FG
622 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
623 check_bucket_eq(source_conn, target_conn, bucket)
7c673cae
FG
624
625def get_latest_object_version(key):
626 for k in key.bucket.list_versions(key.name):
627 if k.is_latest:
628 return k
629 return None
630
631def test_versioned_object_incremental_sync():
632 zonegroup = realm.master_zonegroup()
31f18b77
FG
633 zonegroup_conns = ZonegroupConns(zonegroup)
634 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
7c673cae
FG
635
636 # enable versioning
31f18b77 637 for _, bucket in zone_bucket:
7c673cae
FG
638 bucket.configure_versioning(True)
639
640 zonegroup_meta_checkpoint(zonegroup)
641
642 # upload a dummy object to each bucket and wait for sync. this forces each
643 # bucket to finish a full sync and switch to incremental
31f18b77
FG
644 for source_conn, bucket in zone_bucket:
645 new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
646 for target_conn in zonegroup_conns.zones:
647 if source_conn.zone == target_conn.zone:
7c673cae 648 continue
31f18b77 649 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
7c673cae 650
31f18b77 651 for _, bucket in zone_bucket:
7c673cae 652 # create and delete multiple versions of an object from each zone
31f18b77
FG
653 for zone_conn in zonegroup_conns.rw_zones:
654 obj = 'obj-' + zone_conn.name
655 k = new_key(zone_conn, bucket, obj)
7c673cae
FG
656
657 k.set_contents_from_string('version1')
658 v = get_latest_object_version(k)
659 log.debug('version1 id=%s', v.version_id)
660 # don't delete version1 - this tests that the initial version
661 # doesn't get squashed into later versions
662
663 # create and delete the following object versions to test that
664 # the operations don't race with each other during sync
665 k.set_contents_from_string('version2')
666 v = get_latest_object_version(k)
667 log.debug('version2 id=%s', v.version_id)
668 k.bucket.delete_key(obj, version_id=v.version_id)
669
670 k.set_contents_from_string('version3')
671 v = get_latest_object_version(k)
672 log.debug('version3 id=%s', v.version_id)
673 k.bucket.delete_key(obj, version_id=v.version_id)
674
31f18b77
FG
675 for source_conn, bucket in zone_bucket:
676 for target_conn in zonegroup_conns.zones:
677 if source_conn.zone == target_conn.zone:
7c673cae 678 continue
31f18b77
FG
679 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
680 check_bucket_eq(source_conn, target_conn, bucket)
7c673cae
FG
681
682def test_bucket_versioning():
683 buckets, zone_bucket = create_bucket_per_zone_in_realm()
31f18b77 684 for _, bucket in zone_bucket:
7c673cae
FG
685 bucket.configure_versioning(True)
686 res = bucket.get_versioning_status()
687 key = 'Versioning'
688 assert(key in res and res[key] == 'Enabled')
689
690def test_bucket_acl():
691 buckets, zone_bucket = create_bucket_per_zone_in_realm()
31f18b77 692 for _, bucket in zone_bucket:
7c673cae
FG
693 assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
694 bucket.set_acl('public-read')
695 assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
696
31f18b77
FG
697def test_bucket_cors():
698 buckets, zone_bucket = create_bucket_per_zone_in_realm()
699 for _, bucket in zone_bucket:
700 cors_cfg = CORSConfiguration()
701 cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
702 bucket.set_cors(cors_cfg)
703 assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
704
7c673cae
FG
705def test_bucket_delete_notempty():
706 zonegroup = realm.master_zonegroup()
31f18b77
FG
707 zonegroup_conns = ZonegroupConns(zonegroup)
708 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
7c673cae
FG
709 zonegroup_meta_checkpoint(zonegroup)
710
31f18b77 711 for zone_conn, bucket_name in zone_bucket:
7c673cae 712 # upload an object to each bucket on its own zone
31f18b77 713 conn = zone_conn.get_connection()
7c673cae
FG
714 bucket = conn.get_bucket(bucket_name)
715 k = bucket.new_key('foo')
716 k.set_contents_from_string('bar')
717 # attempt to delete the bucket before this object can sync
718 try:
719 conn.delete_bucket(bucket_name)
720 except boto.exception.S3ResponseError as e:
721 assert(e.error_code == 'BucketNotEmpty')
722 continue
723 assert False # expected 409 BucketNotEmpty
724
725 # assert that each bucket still exists on the master
31f18b77
FG
726 c1 = zonegroup_conns.master_zone.conn
727 for _, bucket_name in zone_bucket:
7c673cae
FG
728 assert c1.get_bucket(bucket_name)
729
730def test_multi_period_incremental_sync():
731 zonegroup = realm.master_zonegroup()
732 if len(zonegroup.zones) < 3:
733 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
734
735 # periods to include in mdlog comparison
736 mdlog_periods = [realm.current_period.id]
737
738 # create a bucket in each zone
31f18b77
FG
739 zonegroup_conns = ZonegroupConns(zonegroup)
740 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
7c673cae
FG
741
742 zonegroup_meta_checkpoint(zonegroup)
743
744 z1, z2, z3 = zonegroup.zones[0:3]
745 assert(z1 == zonegroup.master_zone)
746
747 # kill zone 3 gateways to freeze sync status to incremental in first period
748 z3.stop()
749
750 # change master to zone 2 -> period 2
751 set_master_zone(z2)
752 mdlog_periods += [realm.current_period.id]
753
31f18b77
FG
754 for zone_conn, _ in zone_bucket:
755 if zone_conn.zone == z3:
7c673cae 756 continue
7c673cae 757 bucket_name = gen_bucket_name()
31f18b77
FG
758 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
759 bucket = zone_conn.conn.create_bucket(bucket_name)
7c673cae
FG
760 buckets.append(bucket_name)
761
762 # wait for zone 1 to sync
763 zone_meta_checkpoint(z1)
764
765 # change master back to zone 1 -> period 3
766 set_master_zone(z1)
767 mdlog_periods += [realm.current_period.id]
768
31f18b77
FG
769 for zone_conn, bucket_name in zone_bucket:
770 if zone_conn.zone == z3:
7c673cae 771 continue
7c673cae 772 bucket_name = gen_bucket_name()
31f18b77
FG
773 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
774 bucket = zone_conn.conn.create_bucket(bucket_name)
7c673cae
FG
775 buckets.append(bucket_name)
776
777 # restart zone 3 gateway and wait for sync
778 z3.start()
779 zonegroup_meta_checkpoint(zonegroup)
780
31f18b77 781 # verify that we end up with the same objects
7c673cae 782 for bucket_name in buckets:
31f18b77
FG
783 for source_conn, _ in zone_bucket:
784 for target_conn in zonegroup_conns.zones:
785 if source_conn.zone == target_conn.zone:
786 continue
787
788 target_conn.check_bucket_eq(source_conn, bucket_name)
7c673cae
FG
789
790 # verify that mdlogs are not empty and match for each period
791 for period in mdlog_periods:
792 master_mdlog = mdlog_list(z1, period)
793 assert len(master_mdlog) > 0
794 for zone in zonegroup.zones:
795 if zone == z1:
796 continue
797 mdlog = mdlog_list(zone, period)
798 assert len(mdlog) == len(master_mdlog)
799
800 # autotrim mdlogs for master zone
801 mdlog_autotrim(z1)
802
803 # autotrim mdlogs for peers
804 for zone in zonegroup.zones:
805 if zone == z1:
806 continue
807 mdlog_autotrim(zone)
808
809 # verify that mdlogs are empty for each period
810 for period in mdlog_periods:
811 for zone in zonegroup.zones:
812 mdlog = mdlog_list(zone, period)
813 assert len(mdlog) == 0
814
815def test_zonegroup_remove():
816 zonegroup = realm.master_zonegroup()
31f18b77 817 zonegroup_conns = ZonegroupConns(zonegroup)
7c673cae
FG
818 if len(zonegroup.zones) < 2:
819 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
820
821 zonegroup_meta_checkpoint(zonegroup)
822 z1, z2 = zonegroup.zones[0:2]
823 c1, c2 = (z1.cluster, z2.cluster)
824
825 # create a new zone in zonegroup on c2 and commit
826 zone = Zone('remove', zonegroup, c2)
827 zone.create(c2)
828 zonegroup.zones.append(zone)
829 zonegroup.period.update(zone, commit=True)
830
7c673cae
FG
831 zonegroup.remove(c1, zone)
832
833 # another 'zonegroup remove' should fail with ENOENT
834 _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
835 assert(retcode == 2) # ENOENT
836
837 # delete the new zone
838 zone.delete(c2)
839
840 # validate the resulting period
841 zonegroup.period.update(z1, commit=True)
31f18b77
FG
842
843def test_set_bucket_website():
844 buckets, zone_bucket = create_bucket_per_zone_in_realm()
845 for _, bucket in zone_bucket:
846 website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
847 try:
848 bucket.set_website_configuration(website_cfg)
849 except boto.exception.S3ResponseError as e:
850 if e.error_code == 'MethodNotAllowed':
851 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
852 assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
853
854def test_set_bucket_policy():
855 policy = '''{
856 "Version": "2012-10-17",
857 "Statement": [{
858 "Effect": "Allow",
859 "Principal": "*"
860 }]
861}'''
862 buckets, zone_bucket = create_bucket_per_zone_in_realm()
863 for _, bucket in zone_bucket:
864 bucket.set_policy(policy)
865 assert(bucket.get_policy() == policy)
c07f9fc5
FG
866
867def test_bucket_sync_disable():
868 zonegroup = realm.master_zonegroup()
869 zonegroup_conns = ZonegroupConns(zonegroup)
870 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
871
872 for bucket_name in buckets:
873 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
874
875 for zone in zonegroup.zones:
876 check_buckets_sync_status_obj_not_exist(zone, buckets)
877
878def test_bucket_sync_enable_right_after_disable():
879 zonegroup = realm.master_zonegroup()
880 zonegroup_conns = ZonegroupConns(zonegroup)
881 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
882
883 objnames = ['obj1', 'obj2', 'obj3', 'obj4']
884 content = 'asdasd'
885
886 for zone, bucket in zone_bucket:
887 for objname in objnames:
888 k = new_key(zone, bucket.name, objname)
889 k.set_contents_from_string(content)
890
891 for bucket_name in buckets:
892 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
893
894 for bucket_name in buckets:
895 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
896 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
897
898 objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
899
900 for zone, bucket in zone_bucket:
901 for objname in objnames_2:
902 k = new_key(zone, bucket.name, objname)
903 k.set_contents_from_string(content)
904
905 for bucket_name in buckets:
906 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
907
908def test_bucket_sync_disable_enable():
909 zonegroup = realm.master_zonegroup()
910 zonegroup_conns = ZonegroupConns(zonegroup)
911 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
912
913 objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
914 content = 'asdasd'
915
916 for zone, bucket in zone_bucket:
917 for objname in objnames:
918 k = new_key(zone, bucket.name, objname)
919 k.set_contents_from_string(content)
920
921 zonegroup_meta_checkpoint(zonegroup)
922
923 for bucket_name in buckets:
924 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
925
926 for bucket_name in buckets:
927 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
928
929 zonegroup_meta_checkpoint(zonegroup)
930
931 objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
932
933 for zone, bucket in zone_bucket:
934 for objname in objnames_2:
935 k = new_key(zone, bucket.name, objname)
936 k.set_contents_from_string(content)
937
938 for bucket_name in buckets:
939 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
940
941 for bucket_name in buckets:
942 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
181888fb 943
3efd9988
FG
944def test_multipart_object_sync():
945 zonegroup = realm.master_zonegroup()
946 zonegroup_conns = ZonegroupConns(zonegroup)
947 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
948
949 _, bucket = zone_bucket[0]
950
951 # initiate a multipart upload
952 upload = bucket.initiate_multipart_upload('MULTIPART')
953 mp = boto.s3.multipart.MultiPartUpload(bucket)
954 mp.key_name = upload.key_name
955 mp.id = upload.id
956 part_size = 5 * 1024 * 1024 # 5M min part size
957 mp.upload_part_from_file(StringIO('a' * part_size), 1)
958 mp.upload_part_from_file(StringIO('b' * part_size), 2)
959 mp.upload_part_from_file(StringIO('c' * part_size), 3)
960 mp.upload_part_from_file(StringIO('d' * part_size), 4)
961 mp.complete_upload()
962
963 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
964
181888fb
FG
965def test_encrypted_object_sync():
966 zonegroup = realm.master_zonegroup()
967 zonegroup_conns = ZonegroupConns(zonegroup)
968
969 (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
970
971 # create a bucket on the first zone
972 bucket_name = gen_bucket_name()
973 log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
974 bucket = zone1.conn.create_bucket(bucket_name)
975
976 # upload an object with sse-c encryption
977 sse_c_headers = {
978 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
979 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
980 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
981 }
982 key = bucket.new_key('testobj-sse-c')
983 data = 'A'*512
984 key.set_contents_from_string(data, headers=sse_c_headers)
985
986 # upload an object with sse-kms encryption
987 sse_kms_headers = {
988 'x-amz-server-side-encryption': 'aws:kms',
989 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
990 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
991 }
992 key = bucket.new_key('testobj-sse-kms')
993 key.set_contents_from_string(data, headers=sse_kms_headers)
994
995 # wait for the bucket metadata and data to sync
996 zonegroup_meta_checkpoint(zonegroup)
997 zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
998
999 # read the encrypted objects from the second zone
1000 bucket2 = get_bucket(zone2, bucket_name)
1001 key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
1002 eq(data, key.get_contents_as_string(headers=sse_c_headers))
1003
1004 key = bucket2.get_key('testobj-sse-kms')
1005 eq(data, key.get_contents_as_string())
b32b8144
FG
1006
1007def test_bucket_index_log_trim():
1008 zonegroup = realm.master_zonegroup()
1009 zonegroup_conns = ZonegroupConns(zonegroup)
1010
1011 zone = zonegroup_conns.rw_zones[0]
1012
1013 # create a test bucket, upload some objects, and wait for sync
1014 def make_test_bucket():
1015 name = gen_bucket_name()
1016 log.info('create bucket zone=%s name=%s', zone.name, name)
1017 bucket = zone.conn.create_bucket(name)
1018 for objname in ('a', 'b', 'c', 'd'):
1019 k = new_key(zone, name, objname)
1020 k.set_contents_from_string('foo')
1021 zonegroup_meta_checkpoint(zonegroup)
1022 zonegroup_bucket_checkpoint(zonegroup_conns, name)
1023 return bucket
1024
1025 # create a 'cold' bucket
1026 cold_bucket = make_test_bucket()
1027
1028 # trim with max-buckets=0 to clear counters for cold bucket. this should
1029 # prevent it from being considered 'active' by the next autotrim
1030 bilog_autotrim(zone.zone, [
1031 '--rgw-sync-log-trim-max-buckets', '0',
1032 ])
1033
1034 # create an 'active' bucket
1035 active_bucket = make_test_bucket()
1036
1037 # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
1038 bilog_autotrim(zone.zone, [
1039 '--rgw-sync-log-trim-max-buckets', '1',
1040 '--rgw-sync-log-trim-min-cold-buckets', '0',
1041 ])
1042
1043 # verify active bucket has empty bilog
1044 active_bilog = bilog_list(zone.zone, active_bucket.name)
1045 assert(len(active_bilog) == 0)
1046
1047 # verify cold bucket has nonempty bilog
1048 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1049 assert(len(cold_bilog) > 0)
1050
1051 # trim with min-cold-buckets=999 to trim all buckets
1052 bilog_autotrim(zone.zone, [
1053 '--rgw-sync-log-trim-max-buckets', '999',
1054 '--rgw-sync-log-trim-min-cold-buckets', '999',
1055 ])
1056
1057 # verify cold bucket has empty bilog
1058 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1059 assert(len(cold_bilog) == 0)