]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests.py
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests.py
1 import json
2 import random
3 import string
4 import sys
5 import time
6 import logging
7 import errno
8
9 try:
10 from itertools import izip_longest as zip_longest
11 except ImportError:
12 from itertools import zip_longest
13 from itertools import combinations
14 from six import StringIO
15
16 import boto
17 import boto.s3.connection
18 from boto.s3.website import WebsiteConfiguration
19 from boto.s3.cors import CORSConfiguration
20
21 from nose.tools import eq_ as eq
22 from nose.plugins.attrib import attr
23 from nose.plugins.skip import SkipTest
24
25 from .multisite import Zone, ZoneGroup, Credentials
26
27 from .conn import get_gateway_connection
28 from .tools import assert_raises
29
30 class Config:
31 """ test configuration """
32 def __init__(self, **kwargs):
33 # by default, wait up to 5 minutes before giving up on a sync checkpoint
34 self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
35 self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
36 # allow some time for realm reconfiguration after changing master zone
37 self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
38
39 # rgw multisite tests, written against the interfaces provided in rgw_multi.
40 # these tests must be initialized and run by another module that provides
41 # implementations of these interfaces by calling init_multi()
42 realm = None
43 user = None
44 config = None
45 def init_multi(_realm, _user, _config=None):
46 global realm
47 realm = _realm
48 global user
49 user = _user
50 global config
51 config = _config or Config()
52 realm_meta_checkpoint(realm)
53
54 def get_realm():
55 return realm
56
57 log = logging.getLogger('rgw_multi.tests')
58
59 num_buckets = 0
60 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
61
62 def get_gateway_connection(gateway, credentials):
63 """ connect to the given gateway """
64 if gateway.connection is None:
65 gateway.connection = boto.connect_s3(
66 aws_access_key_id = credentials.access_key,
67 aws_secret_access_key = credentials.secret,
68 host = gateway.host,
69 port = gateway.port,
70 is_secure = False,
71 calling_format = boto.s3.connection.OrdinaryCallingFormat())
72 return gateway.connection
73
74 def get_zone_connection(zone, credentials):
75 """ connect to the zone's first gateway """
76 if isinstance(credentials, list):
77 credentials = credentials[0]
78 return get_gateway_connection(zone.gateways[0], credentials)
79
80 def mdlog_list(zone, period = None):
81 cmd = ['mdlog', 'list']
82 if period:
83 cmd += ['--period', period]
84 (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
85 mdlog_json = mdlog_json.decode('utf-8')
86 return json.loads(mdlog_json)
87
88 def meta_sync_status(zone):
89 while True:
90 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
91 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
92 if retcode == 0:
93 break
94 assert(retcode == 2) # ENOENT
95 time.sleep(5)
96
97 def mdlog_autotrim(zone):
98 zone.cluster.admin(['mdlog', 'autotrim'])
99
100 def datalog_list(zone, period = None):
101 cmd = ['datalog', 'list']
102 (datalog_json, _) = zone.cluster.admin(cmd, read_only=True)
103 datalog_json = datalog_json.decode('utf-8')
104 return json.loads(datalog_json)
105
106 def datalog_autotrim(zone):
107 zone.cluster.admin(['datalog', 'autotrim'])
108
109 def bilog_list(zone, bucket, args = None):
110 cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
111 bilog, _ = zone.cluster.admin(cmd, read_only=True)
112 bilog = bilog.decode('utf-8')
113 return json.loads(bilog)
114
115 def bilog_autotrim(zone, args = None):
116 zone.cluster.admin(['bilog', 'autotrim'] + (args or []))
117
118 def parse_meta_sync_status(meta_sync_status_json):
119 meta_sync_status_json = meta_sync_status_json.decode('utf-8')
120 log.debug('current meta sync status=%s', meta_sync_status_json)
121 sync_status = json.loads(meta_sync_status_json)
122
123 sync_info = sync_status['sync_status']['info']
124 global_sync_status = sync_info['status']
125 num_shards = sync_info['num_shards']
126 period = sync_info['period']
127 realm_epoch = sync_info['realm_epoch']
128
129 sync_markers=sync_status['sync_status']['markers']
130 log.debug('sync_markers=%s', sync_markers)
131 assert(num_shards == len(sync_markers))
132
133 markers={}
134 for i in range(num_shards):
135 # get marker, only if it's an incremental marker for the same realm epoch
136 if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
137 markers[i] = ''
138 else:
139 markers[i] = sync_markers[i]['val']['marker']
140
141 return period, realm_epoch, num_shards, markers
142
143 def meta_sync_status(zone):
144 for _ in range(config.checkpoint_retries):
145 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
146 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
147 if retcode == 0:
148 return parse_meta_sync_status(meta_sync_status_json)
149 assert(retcode == 2) # ENOENT
150 time.sleep(config.checkpoint_delay)
151
152 assert False, 'failed to read metadata sync status for zone=%s' % zone.name
153
154 def meta_master_log_status(master_zone):
155 cmd = ['mdlog', 'status'] + master_zone.zone_args()
156 mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
157 mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
158
159 markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
160 log.debug('master meta markers=%s', markers)
161 return markers
162
163 def compare_meta_status(zone, log_status, sync_status):
164 if len(log_status) != len(sync_status):
165 log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
166 return False
167
168 msg = ''
169 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
170 if l > s:
171 if len(msg):
172 msg += ', '
173 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
174
175 if len(msg) > 0:
176 log.warning('zone %s behind master: %s', zone.name, msg)
177 return False
178
179 return True
180
181 def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
182 if not meta_master_zone:
183 meta_master_zone = zone.realm().meta_master_zone()
184 if not master_status:
185 master_status = meta_master_log_status(meta_master_zone)
186
187 current_realm_epoch = realm.current_period.data['realm_epoch']
188
189 log.info('starting meta checkpoint for zone=%s', zone.name)
190
191 for _ in range(config.checkpoint_retries):
192 period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
193 if realm_epoch < current_realm_epoch:
194 log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
195 zone.name, realm_epoch, current_realm_epoch)
196 else:
197 log.debug('log_status=%s', master_status)
198 log.debug('sync_status=%s', sync_status)
199 if compare_meta_status(zone, master_status, sync_status):
200 log.info('finish meta checkpoint for zone=%s', zone.name)
201 return
202
203 time.sleep(config.checkpoint_delay)
204 assert False, 'failed meta checkpoint for zone=%s' % zone.name
205
206 def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
207 if not meta_master_zone:
208 meta_master_zone = zonegroup.realm().meta_master_zone()
209 if not master_status:
210 master_status = meta_master_log_status(meta_master_zone)
211
212 for zone in zonegroup.zones:
213 if zone == meta_master_zone:
214 continue
215 zone_meta_checkpoint(zone, meta_master_zone, master_status)
216
217 def realm_meta_checkpoint(realm):
218 log.info('meta checkpoint')
219
220 meta_master_zone = realm.meta_master_zone()
221 master_status = meta_master_log_status(meta_master_zone)
222
223 for zonegroup in realm.current_period.zonegroups:
224 zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
225
226 def parse_data_sync_status(data_sync_status_json):
227 data_sync_status_json = data_sync_status_json.decode('utf-8')
228 log.debug('current data sync status=%s', data_sync_status_json)
229 sync_status = json.loads(data_sync_status_json)
230
231 global_sync_status=sync_status['sync_status']['info']['status']
232 num_shards=sync_status['sync_status']['info']['num_shards']
233
234 sync_markers=sync_status['sync_status']['markers']
235 log.debug('sync_markers=%s', sync_markers)
236 assert(num_shards == len(sync_markers))
237
238 markers={}
239 for i in range(num_shards):
240 markers[i] = sync_markers[i]['val']['marker']
241
242 return (num_shards, markers)
243
244 def data_sync_status(target_zone, source_zone):
245 if target_zone == source_zone:
246 return None
247
248 for _ in range(config.checkpoint_retries):
249 cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
250 cmd += ['--source-zone', source_zone.name]
251 data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
252 if retcode == 0:
253 return parse_data_sync_status(data_sync_status_json)
254
255 assert(retcode == 2) # ENOENT
256 time.sleep(config.checkpoint_delay)
257
258 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
259 (target_zone.name, source_zone.name)
260
261 def bucket_sync_status(target_zone, source_zone, bucket_name):
262 if target_zone == source_zone:
263 return None
264
265 cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args()
266 cmd += ['--source-zone', source_zone.name]
267 cmd += ['--bucket', bucket_name]
268 while True:
269 bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
270 if retcode == 0:
271 break
272
273 assert(retcode == 2) # ENOENT
274
275 bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
276 log.debug('current bucket sync markers=%s', bucket_sync_status_json)
277 sync_status = json.loads(bucket_sync_status_json)
278
279 markers={}
280 for entry in sync_status:
281 val = entry['val']
282 if val['status'] == 'incremental-sync':
283 pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
284 else:
285 pos = ''
286 markers[entry['key']] = pos
287
288 return markers
289
290 def data_source_log_status(source_zone):
291 source_cluster = source_zone.cluster
292 cmd = ['datalog', 'status'] + source_zone.zone_args()
293 datalog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
294 datalog_status = json.loads(datalog_status_json.decode('utf-8'))
295
296 markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
297 log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
298 return markers
299
300 def bucket_source_log_status(source_zone, bucket_name):
301 cmd = ['bilog', 'status'] + source_zone.zone_args()
302 cmd += ['--bucket', bucket_name]
303 source_cluster = source_zone.cluster
304 bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
305 bilog_status = json.loads(bilog_status_json.decode('utf-8'))
306
307 m={}
308 markers={}
309 try:
310 m = bilog_status['markers']
311 except:
312 pass
313
314 for s in m:
315 key = s['key']
316 val = s['val']
317 markers[key] = val
318
319 log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
320 return markers
321
322 def compare_data_status(target_zone, source_zone, log_status, sync_status):
323 if len(log_status) != len(sync_status):
324 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
325 return False
326
327 msg = ''
328 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
329 if l > s:
330 if len(msg):
331 msg += ', '
332 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
333
334 if len(msg) > 0:
335 log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
336 return False
337
338 return True
339
340 def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
341 if len(log_status) != len(sync_status):
342 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
343 return False
344
345 msg = ''
346 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
347 if l > s:
348 if len(msg):
349 msg += ', '
350 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
351
352 if len(msg) > 0:
353 log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
354 return False
355
356 return True
357
358 def zone_data_checkpoint(target_zone, source_zone):
359 if target_zone == source_zone:
360 return
361
362 log_status = data_source_log_status(source_zone)
363 log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
364
365 for _ in range(config.checkpoint_retries):
366 num_shards, sync_status = data_sync_status(target_zone, source_zone)
367
368 log.debug('log_status=%s', log_status)
369 log.debug('sync_status=%s', sync_status)
370
371 if compare_data_status(target_zone, source_zone, log_status, sync_status):
372 log.info('finished data checkpoint for target_zone=%s source_zone=%s',
373 target_zone.name, source_zone.name)
374 return
375 time.sleep(config.checkpoint_delay)
376
377 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
378 (target_zone.name, source_zone.name)
379
380 def zonegroup_data_checkpoint(zonegroup_conns):
381 for source_conn in zonegroup_conns.rw_zones:
382 for target_conn in zonegroup_conns.zones:
383 if source_conn.zone == target_conn.zone:
384 continue
385 log.debug('data checkpoint: source=%s target=%s', source_conn.zone.name, target_conn.zone.name)
386 zone_data_checkpoint(target_conn.zone, source_conn.zone)
387
388 def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
389 if target_zone == source_zone:
390 return
391
392 log_status = bucket_source_log_status(source_zone, bucket_name)
393 log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
394
395 for _ in range(config.checkpoint_retries):
396 sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
397
398 log.debug('log_status=%s', log_status)
399 log.debug('sync_status=%s', sync_status)
400
401 if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
402 log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
403 return
404
405 time.sleep(config.checkpoint_delay)
406
407 assert False, 'failed bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
408 (target_zone.name, source_zone.name, bucket_name)
409
410 def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
411 for source_conn in zonegroup_conns.rw_zones:
412 for target_conn in zonegroup_conns.zones:
413 if source_conn.zone == target_conn.zone:
414 continue
415 log.debug('bucket checkpoint: source=%s target=%s bucket=%s', source_conn.zone.name, target_conn.zone.name, bucket_name)
416 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
417 for source_conn, target_conn in combinations(zonegroup_conns.zones, 2):
418 if target_conn.zone.has_buckets():
419 target_conn.check_bucket_eq(source_conn, bucket_name)
420
421 def set_master_zone(zone):
422 zone.modify(zone.cluster, ['--master'])
423 zonegroup = zone.zonegroup
424 zonegroup.period.update(zone, commit=True)
425 zonegroup.master_zone = zone
426 log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
427 time.sleep(config.reconfigure_delay)
428
429 def set_sync_from_all(zone, flag):
430 s = 'true' if flag else 'false'
431 zone.modify(zone.cluster, ['--sync-from-all={}'.format(s)])
432 zonegroup = zone.zonegroup
433 zonegroup.period.update(zone, commit=True)
434 log.info('Set sync_from_all flag on zone %s to %s', zone.name, s)
435 time.sleep(config.reconfigure_delay)
436
437 def set_redirect_zone(zone, redirect_zone):
438 id_str = redirect_zone.id if redirect_zone else ''
439 zone.modify(zone.cluster, ['--redirect-zone={}'.format(id_str)])
440 zonegroup = zone.zonegroup
441 zonegroup.period.update(zone, commit=True)
442 log.info('Set redirect_zone zone %s to "%s"', zone.name, id_str)
443 time.sleep(config.reconfigure_delay)
444
445 def enable_bucket_sync(zone, bucket_name):
446 cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
447 zone.cluster.admin(cmd)
448
449 def disable_bucket_sync(zone, bucket_name):
450 cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
451 zone.cluster.admin(cmd)
452
453 def check_buckets_sync_status_obj_not_exist(zone, buckets):
454 for _ in range(config.checkpoint_retries):
455 cmd = ['log', 'list'] + zone.zone_arg()
456 log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
457 for bucket in buckets:
458 if log_list.find(':'+bucket+":") >= 0:
459 break
460 else:
461 return
462 time.sleep(config.checkpoint_delay)
463 assert False
464
465 def gen_bucket_name():
466 global num_buckets
467
468 num_buckets += 1
469 return run_prefix + '-' + str(num_buckets)
470
471 class ZonegroupConns:
472 def __init__(self, zonegroup):
473 self.zonegroup = zonegroup
474 self.zones = []
475 self.ro_zones = []
476 self.rw_zones = []
477 self.master_zone = None
478 for z in zonegroup.zones:
479 zone_conn = z.get_conn(user.credentials)
480 self.zones.append(zone_conn)
481 if z.is_read_only():
482 self.ro_zones.append(zone_conn)
483 else:
484 self.rw_zones.append(zone_conn)
485
486 if z == zonegroup.master_zone:
487 self.master_zone = zone_conn
488
489 def check_all_buckets_exist(zone_conn, buckets):
490 if not zone_conn.zone.has_buckets():
491 return True
492
493 for b in buckets:
494 try:
495 zone_conn.get_bucket(b)
496 except:
497 log.critical('zone %s does not contain bucket %s', zone.name, b)
498 return False
499
500 return True
501
502 def check_all_buckets_dont_exist(zone_conn, buckets):
503 if not zone_conn.zone.has_buckets():
504 return True
505
506 for b in buckets:
507 try:
508 zone_conn.get_bucket(b)
509 except:
510 continue
511
512 log.critical('zone %s contains bucket %s', zone.zone, b)
513 return False
514
515 return True
516
517 def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
518 buckets = []
519 zone_bucket = []
520 for zone in zonegroup_conns.rw_zones:
521 for i in xrange(buckets_per_zone):
522 bucket_name = gen_bucket_name()
523 log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
524 bucket = zone.create_bucket(bucket_name)
525 buckets.append(bucket_name)
526 zone_bucket.append((zone, bucket))
527
528 return buckets, zone_bucket
529
530 def create_bucket_per_zone_in_realm():
531 buckets = []
532 zone_bucket = []
533 for zonegroup in realm.current_period.zonegroups:
534 zg_conn = ZonegroupConns(zonegroup)
535 b, z = create_bucket_per_zone(zg_conn)
536 buckets.extend(b)
537 zone_bucket.extend(z)
538 return buckets, zone_bucket
539
540 def test_bucket_create():
541 zonegroup = realm.master_zonegroup()
542 zonegroup_conns = ZonegroupConns(zonegroup)
543 buckets, _ = create_bucket_per_zone(zonegroup_conns)
544 zonegroup_meta_checkpoint(zonegroup)
545
546 for zone in zonegroup_conns.zones:
547 assert check_all_buckets_exist(zone, buckets)
548
549 def test_bucket_recreate():
550 zonegroup = realm.master_zonegroup()
551 zonegroup_conns = ZonegroupConns(zonegroup)
552 buckets, _ = create_bucket_per_zone(zonegroup_conns)
553 zonegroup_meta_checkpoint(zonegroup)
554
555
556 for zone in zonegroup_conns.zones:
557 assert check_all_buckets_exist(zone, buckets)
558
559 # recreate buckets on all zones, make sure they weren't removed
560 for zone in zonegroup_conns.rw_zones:
561 for bucket_name in buckets:
562 bucket = zone.create_bucket(bucket_name)
563
564 for zone in zonegroup_conns.zones:
565 assert check_all_buckets_exist(zone, buckets)
566
567 zonegroup_meta_checkpoint(zonegroup)
568
569 for zone in zonegroup_conns.zones:
570 assert check_all_buckets_exist(zone, buckets)
571
572 def test_bucket_remove():
573 zonegroup = realm.master_zonegroup()
574 zonegroup_conns = ZonegroupConns(zonegroup)
575 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
576 zonegroup_meta_checkpoint(zonegroup)
577
578 for zone in zonegroup_conns.zones:
579 assert check_all_buckets_exist(zone, buckets)
580
581 for zone, bucket_name in zone_bucket:
582 zone.conn.delete_bucket(bucket_name)
583
584 zonegroup_meta_checkpoint(zonegroup)
585
586 for zone in zonegroup_conns.zones:
587 assert check_all_buckets_dont_exist(zone, buckets)
588
589 def get_bucket(zone, bucket_name):
590 return zone.conn.get_bucket(bucket_name)
591
592 def get_key(zone, bucket_name, obj_name):
593 b = get_bucket(zone, bucket_name)
594 return b.get_key(obj_name)
595
596 def new_key(zone, bucket_name, obj_name):
597 b = get_bucket(zone, bucket_name)
598 return b.new_key(obj_name)
599
600 def check_bucket_eq(zone_conn1, zone_conn2, bucket):
601 if zone_conn2.zone.has_buckets():
602 zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
603
604 def test_object_sync():
605 zonegroup = realm.master_zonegroup()
606 zonegroup_conns = ZonegroupConns(zonegroup)
607 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
608
609 objnames = [ 'myobj', '_myobj', ':', '&' ]
610 content = 'asdasd'
611
612 # don't wait for meta sync just yet
613 for zone, bucket_name in zone_bucket:
614 for objname in objnames:
615 k = new_key(zone, bucket_name, objname)
616 k.set_contents_from_string(content)
617
618 zonegroup_meta_checkpoint(zonegroup)
619
620 for source_conn, bucket in zone_bucket:
621 for target_conn in zonegroup_conns.zones:
622 if source_conn.zone == target_conn.zone:
623 continue
624
625 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
626 check_bucket_eq(source_conn, target_conn, bucket)
627
628 def test_object_delete():
629 zonegroup = realm.master_zonegroup()
630 zonegroup_conns = ZonegroupConns(zonegroup)
631 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
632
633 objname = 'myobj'
634 content = 'asdasd'
635
636 # don't wait for meta sync just yet
637 for zone, bucket in zone_bucket:
638 k = new_key(zone, bucket, objname)
639 k.set_contents_from_string(content)
640
641 zonegroup_meta_checkpoint(zonegroup)
642
643 # check object exists
644 for source_conn, bucket in zone_bucket:
645 for target_conn in zonegroup_conns.zones:
646 if source_conn.zone == target_conn.zone:
647 continue
648
649 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
650 check_bucket_eq(source_conn, target_conn, bucket)
651
652 # check object removal
653 for source_conn, bucket in zone_bucket:
654 k = get_key(source_conn, bucket, objname)
655 k.delete()
656 for target_conn in zonegroup_conns.zones:
657 if source_conn.zone == target_conn.zone:
658 continue
659
660 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
661 check_bucket_eq(source_conn, target_conn, bucket)
662
663 def get_latest_object_version(key):
664 for k in key.bucket.list_versions(key.name):
665 if k.is_latest:
666 return k
667 return None
668
669 def test_versioned_object_incremental_sync():
670 zonegroup = realm.master_zonegroup()
671 zonegroup_conns = ZonegroupConns(zonegroup)
672 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
673
674 # enable versioning
675 for _, bucket in zone_bucket:
676 bucket.configure_versioning(True)
677
678 zonegroup_meta_checkpoint(zonegroup)
679
680 # upload a dummy object to each bucket and wait for sync. this forces each
681 # bucket to finish a full sync and switch to incremental
682 for source_conn, bucket in zone_bucket:
683 new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
684 for target_conn in zonegroup_conns.zones:
685 if source_conn.zone == target_conn.zone:
686 continue
687 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
688
689 for _, bucket in zone_bucket:
690 # create and delete multiple versions of an object from each zone
691 for zone_conn in zonegroup_conns.rw_zones:
692 obj = 'obj-' + zone_conn.name
693 k = new_key(zone_conn, bucket, obj)
694
695 k.set_contents_from_string('version1')
696 log.debug('version1 id=%s', k.version_id)
697 # don't delete version1 - this tests that the initial version
698 # doesn't get squashed into later versions
699
700 # create and delete the following object versions to test that
701 # the operations don't race with each other during sync
702 k.set_contents_from_string('version2')
703 log.debug('version2 id=%s', k.version_id)
704 k.bucket.delete_key(obj, version_id=k.version_id)
705
706 k.set_contents_from_string('version3')
707 log.debug('version3 id=%s', k.version_id)
708 k.bucket.delete_key(obj, version_id=k.version_id)
709
710 for _, bucket in zone_bucket:
711 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
712
713 for _, bucket in zone_bucket:
714 # overwrite the acls to test that metadata-only entries are applied
715 for zone_conn in zonegroup_conns.rw_zones:
716 obj = 'obj-' + zone_conn.name
717 k = new_key(zone_conn, bucket.name, obj)
718 v = get_latest_object_version(k)
719 v.make_public()
720
721 for _, bucket in zone_bucket:
722 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
723
724 def test_version_suspended_incremental_sync():
725 zonegroup = realm.master_zonegroup()
726 zonegroup_conns = ZonegroupConns(zonegroup)
727
728 zone = zonegroup_conns.rw_zones[0]
729
730 # create a non-versioned bucket
731 bucket = zone.create_bucket(gen_bucket_name())
732 log.debug('created bucket=%s', bucket.name)
733 zonegroup_meta_checkpoint(zonegroup)
734
735 # upload an initial object
736 key1 = new_key(zone, bucket, 'obj')
737 key1.set_contents_from_string('')
738 log.debug('created initial version id=%s', key1.version_id)
739 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
740
741 # enable versioning
742 bucket.configure_versioning(True)
743 zonegroup_meta_checkpoint(zonegroup)
744
745 # re-upload the object as a new version
746 key2 = new_key(zone, bucket, 'obj')
747 key2.set_contents_from_string('')
748 log.debug('created new version id=%s', key2.version_id)
749 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
750
751 # suspend versioning
752 bucket.configure_versioning(False)
753 zonegroup_meta_checkpoint(zonegroup)
754
755 # re-upload the object as a 'null' version
756 key3 = new_key(zone, bucket, 'obj')
757 key3.set_contents_from_string('')
758 log.debug('created null version id=%s', key3.version_id)
759 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
760
761 def test_delete_marker_full_sync():
762 zonegroup = realm.master_zonegroup()
763 zonegroup_conns = ZonegroupConns(zonegroup)
764 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
765
766 # enable versioning
767 for _, bucket in zone_bucket:
768 bucket.configure_versioning(True)
769 zonegroup_meta_checkpoint(zonegroup)
770
771 for zone, bucket in zone_bucket:
772 # upload an initial object
773 key1 = new_key(zone, bucket, 'obj')
774 key1.set_contents_from_string('')
775
776 # create a delete marker
777 key2 = new_key(zone, bucket, 'obj')
778 key2.delete()
779
780 # wait for full sync
781 for _, bucket in zone_bucket:
782 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
783
784 def test_suspended_delete_marker_full_sync():
785 zonegroup = realm.master_zonegroup()
786 zonegroup_conns = ZonegroupConns(zonegroup)
787 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
788
789 # enable/suspend versioning
790 for _, bucket in zone_bucket:
791 bucket.configure_versioning(True)
792 bucket.configure_versioning(False)
793 zonegroup_meta_checkpoint(zonegroup)
794
795 for zone, bucket in zone_bucket:
796 # upload an initial object
797 key1 = new_key(zone, bucket, 'obj')
798 key1.set_contents_from_string('')
799
800 # create a delete marker
801 key2 = new_key(zone, bucket, 'obj')
802 key2.delete()
803
804 # wait for full sync
805 for _, bucket in zone_bucket:
806 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
807
808 def test_bucket_versioning():
809 buckets, zone_bucket = create_bucket_per_zone_in_realm()
810 for _, bucket in zone_bucket:
811 bucket.configure_versioning(True)
812 res = bucket.get_versioning_status()
813 key = 'Versioning'
814 assert(key in res and res[key] == 'Enabled')
815
816 def test_bucket_acl():
817 buckets, zone_bucket = create_bucket_per_zone_in_realm()
818 for _, bucket in zone_bucket:
819 assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
820 bucket.set_acl('public-read')
821 assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
822
823 def test_bucket_cors():
824 buckets, zone_bucket = create_bucket_per_zone_in_realm()
825 for _, bucket in zone_bucket:
826 cors_cfg = CORSConfiguration()
827 cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
828 bucket.set_cors(cors_cfg)
829 assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
830
831 def test_bucket_delete_notempty():
832 zonegroup = realm.master_zonegroup()
833 zonegroup_conns = ZonegroupConns(zonegroup)
834 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
835 zonegroup_meta_checkpoint(zonegroup)
836
837 for zone_conn, bucket_name in zone_bucket:
838 # upload an object to each bucket on its own zone
839 conn = zone_conn.get_connection()
840 bucket = conn.get_bucket(bucket_name)
841 k = bucket.new_key('foo')
842 k.set_contents_from_string('bar')
843 # attempt to delete the bucket before this object can sync
844 try:
845 conn.delete_bucket(bucket_name)
846 except boto.exception.S3ResponseError as e:
847 assert(e.error_code == 'BucketNotEmpty')
848 continue
849 assert False # expected 409 BucketNotEmpty
850
851 # assert that each bucket still exists on the master
852 c1 = zonegroup_conns.master_zone.conn
853 for _, bucket_name in zone_bucket:
854 assert c1.get_bucket(bucket_name)
855
856 def test_multi_period_incremental_sync():
857 zonegroup = realm.master_zonegroup()
858 if len(zonegroup.zones) < 3:
859 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
860
861 # periods to include in mdlog comparison
862 mdlog_periods = [realm.current_period.id]
863
864 # create a bucket in each zone
865 zonegroup_conns = ZonegroupConns(zonegroup)
866 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
867
868 zonegroup_meta_checkpoint(zonegroup)
869
870 z1, z2, z3 = zonegroup.zones[0:3]
871 assert(z1 == zonegroup.master_zone)
872
873 # kill zone 3 gateways to freeze sync status to incremental in first period
874 z3.stop()
875
876 # change master to zone 2 -> period 2
877 set_master_zone(z2)
878 mdlog_periods += [realm.current_period.id]
879
880 for zone_conn, _ in zone_bucket:
881 if zone_conn.zone == z3:
882 continue
883 bucket_name = gen_bucket_name()
884 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
885 bucket = zone_conn.conn.create_bucket(bucket_name)
886 buckets.append(bucket_name)
887
888 # wait for zone 1 to sync
889 zone_meta_checkpoint(z1)
890
891 # change master back to zone 1 -> period 3
892 set_master_zone(z1)
893 mdlog_periods += [realm.current_period.id]
894
895 for zone_conn, bucket_name in zone_bucket:
896 if zone_conn.zone == z3:
897 continue
898 bucket_name = gen_bucket_name()
899 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
900 zone_conn.conn.create_bucket(bucket_name)
901 buckets.append(bucket_name)
902
903 # restart zone 3 gateway and wait for sync
904 z3.start()
905 zonegroup_meta_checkpoint(zonegroup)
906
907 # verify that we end up with the same objects
908 for bucket_name in buckets:
909 for source_conn, _ in zone_bucket:
910 for target_conn in zonegroup_conns.zones:
911 if source_conn.zone == target_conn.zone:
912 continue
913
914 if target_conn.zone.has_buckets():
915 target_conn.check_bucket_eq(source_conn, bucket_name)
916
917 # verify that mdlogs are not empty and match for each period
918 for period in mdlog_periods:
919 master_mdlog = mdlog_list(z1, period)
920 assert len(master_mdlog) > 0
921 for zone in zonegroup.zones:
922 if zone == z1:
923 continue
924 mdlog = mdlog_list(zone, period)
925 assert len(mdlog) == len(master_mdlog)
926
927 # autotrim mdlogs for master zone
928 mdlog_autotrim(z1)
929
930 # autotrim mdlogs for peers
931 for zone in zonegroup.zones:
932 if zone == z1:
933 continue
934 mdlog_autotrim(zone)
935
936 # verify that mdlogs are empty for each period
937 for period in mdlog_periods:
938 for zone in zonegroup.zones:
939 mdlog = mdlog_list(zone, period)
940 assert len(mdlog) == 0
941
942 def test_datalog_autotrim():
943 zonegroup = realm.master_zonegroup()
944 zonegroup_conns = ZonegroupConns(zonegroup)
945 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
946
947 # upload an object to each zone to generate a datalog entry
948 for zone, bucket in zone_bucket:
949 k = new_key(zone, bucket.name, 'key')
950 k.set_contents_from_string('body')
951
952 # wait for data sync to catch up
953 zonegroup_data_checkpoint(zonegroup_conns)
954
955 # trim each datalog
956 for zone, _ in zone_bucket:
957 datalog_autotrim(zone.zone)
958 datalog = datalog_list(zone.zone)
959 assert len(datalog) == 0
960
961 def test_multi_zone_redirect():
962 zonegroup = realm.master_zonegroup()
963 if len(zonegroup.rw_zones) < 2:
964 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
965
966 zonegroup_conns = ZonegroupConns(zonegroup)
967 (zc1, zc2) = zonegroup_conns.rw_zones[0:2]
968
969 z1, z2 = (zc1.zone, zc2.zone)
970
971 set_sync_from_all(z2, False)
972
973 # create a bucket on the first zone
974 bucket_name = gen_bucket_name()
975 log.info('create bucket zone=%s name=%s', z1.name, bucket_name)
976 bucket = zc1.conn.create_bucket(bucket_name)
977 obj = 'testredirect'
978
979 key = bucket.new_key(obj)
980 data = 'A'*512
981 key.set_contents_from_string(data)
982
983 zonegroup_meta_checkpoint(zonegroup)
984
985 # try to read object from second zone (should fail)
986 bucket2 = get_bucket(zc2, bucket_name)
987 assert_raises(boto.exception.S3ResponseError, bucket2.get_key, obj)
988
989 set_redirect_zone(z2, z1)
990
991 key2 = bucket2.get_key(obj)
992
993 eq(data, key2.get_contents_as_string())
994
995 key = bucket.new_key(obj)
996
997 for x in ['a', 'b', 'c', 'd']:
998 data = x*512
999 key.set_contents_from_string(data)
1000 eq(data, key2.get_contents_as_string())
1001
1002 # revert config changes
1003 set_sync_from_all(z2, True)
1004 set_redirect_zone(z2, None)
1005
1006 def test_zonegroup_remove():
1007 zonegroup = realm.master_zonegroup()
1008 zonegroup_conns = ZonegroupConns(zonegroup)
1009 if len(zonegroup.zones) < 2:
1010 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
1011
1012 zonegroup_meta_checkpoint(zonegroup)
1013 z1, z2 = zonegroup.zones[0:2]
1014 c1, c2 = (z1.cluster, z2.cluster)
1015
1016 # get admin credentials out of existing zone
1017 system_key = z1.data['system_key']
1018 admin_creds = Credentials(system_key['access_key'], system_key['secret_key'])
1019
1020 # create a new zone in zonegroup on c2 and commit
1021 zone = Zone('remove', zonegroup, c2)
1022 zone.create(c2, admin_creds.credential_args())
1023 zonegroup.zones.append(zone)
1024 zonegroup.period.update(zone, commit=True)
1025
1026 zonegroup.remove(c1, zone)
1027
1028 # another 'zonegroup remove' should fail with ENOENT
1029 _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
1030 assert(retcode == 2) # ENOENT
1031
1032 # delete the new zone
1033 zone.delete(c2)
1034
1035 # validate the resulting period
1036 zonegroup.period.update(z1, commit=True)
1037
1038
1039 def test_zg_master_zone_delete():
1040
1041 master_zg = realm.master_zonegroup()
1042 master_zone = master_zg.master_zone
1043
1044 assert(len(master_zg.zones) >= 1)
1045 master_cluster = master_zg.zones[0].cluster
1046
1047 rm_zg = ZoneGroup('remove_zg')
1048 rm_zg.create(master_cluster)
1049
1050 rm_zone = Zone('remove', rm_zg, master_cluster)
1051 rm_zone.create(master_cluster)
1052 master_zg.period.update(master_zone, commit=True)
1053
1054
1055 rm_zone.delete(master_cluster)
1056 # Period update: This should now fail as the zone will be the master zone
1057 # in that zg
1058 _, retcode = master_zg.period.update(master_zone, check_retcode=False)
1059 assert(retcode == errno.EINVAL)
1060
1061 # Proceed to delete the zonegroup as well, previous period now does not
1062 # contain a dangling master_zone, this must succeed
1063 rm_zg.delete(master_cluster)
1064 master_zg.period.update(master_zone, commit=True)
1065
1066 def test_set_bucket_website():
1067 buckets, zone_bucket = create_bucket_per_zone_in_realm()
1068 for _, bucket in zone_bucket:
1069 website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
1070 try:
1071 bucket.set_website_configuration(website_cfg)
1072 except boto.exception.S3ResponseError as e:
1073 if e.error_code == 'MethodNotAllowed':
1074 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
1075 assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
1076
1077 def test_set_bucket_policy():
1078 policy = '''{
1079 "Version": "2012-10-17",
1080 "Statement": [{
1081 "Effect": "Allow",
1082 "Principal": "*"
1083 }]
1084 }'''
1085 buckets, zone_bucket = create_bucket_per_zone_in_realm()
1086 for _, bucket in zone_bucket:
1087 bucket.set_policy(policy)
1088 assert(bucket.get_policy() == policy)
1089
1090 def test_bucket_sync_disable():
1091 zonegroup = realm.master_zonegroup()
1092 zonegroup_conns = ZonegroupConns(zonegroup)
1093 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1094
1095 for bucket_name in buckets:
1096 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
1097
1098 for zone in zonegroup.zones:
1099 check_buckets_sync_status_obj_not_exist(zone, buckets)
1100
1101 zonegroup_data_checkpoint(zonegroup_conns)
1102
1103 def test_bucket_sync_enable_right_after_disable():
1104 zonegroup = realm.master_zonegroup()
1105 zonegroup_conns = ZonegroupConns(zonegroup)
1106 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1107
1108 objnames = ['obj1', 'obj2', 'obj3', 'obj4']
1109 content = 'asdasd'
1110
1111 for zone, bucket in zone_bucket:
1112 for objname in objnames:
1113 k = new_key(zone, bucket.name, objname)
1114 k.set_contents_from_string(content)
1115
1116 for bucket_name in buckets:
1117 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1118
1119 for bucket_name in buckets:
1120 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
1121 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
1122
1123 objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
1124
1125 for zone, bucket in zone_bucket:
1126 for objname in objnames_2:
1127 k = new_key(zone, bucket.name, objname)
1128 k.set_contents_from_string(content)
1129
1130 for bucket_name in buckets:
1131 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1132
1133 zonegroup_data_checkpoint(zonegroup_conns)
1134
1135 def test_bucket_sync_disable_enable():
1136 zonegroup = realm.master_zonegroup()
1137 zonegroup_conns = ZonegroupConns(zonegroup)
1138 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1139
1140 objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
1141 content = 'asdasd'
1142
1143 for zone, bucket in zone_bucket:
1144 for objname in objnames:
1145 k = new_key(zone, bucket.name, objname)
1146 k.set_contents_from_string(content)
1147
1148 zonegroup_meta_checkpoint(zonegroup)
1149
1150 for bucket_name in buckets:
1151 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1152
1153 for bucket_name in buckets:
1154 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
1155
1156 zonegroup_meta_checkpoint(zonegroup)
1157
1158 objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
1159
1160 for zone, bucket in zone_bucket:
1161 for objname in objnames_2:
1162 k = new_key(zone, bucket.name, objname)
1163 k.set_contents_from_string(content)
1164
1165 for bucket_name in buckets:
1166 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
1167
1168 for bucket_name in buckets:
1169 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1170
1171 zonegroup_data_checkpoint(zonegroup_conns)
1172
1173 def test_multipart_object_sync():
1174 zonegroup = realm.master_zonegroup()
1175 zonegroup_conns = ZonegroupConns(zonegroup)
1176 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1177
1178 _, bucket = zone_bucket[0]
1179
1180 # initiate a multipart upload
1181 upload = bucket.initiate_multipart_upload('MULTIPART')
1182 mp = boto.s3.multipart.MultiPartUpload(bucket)
1183 mp.key_name = upload.key_name
1184 mp.id = upload.id
1185 part_size = 5 * 1024 * 1024 # 5M min part size
1186 mp.upload_part_from_file(StringIO('a' * part_size), 1)
1187 mp.upload_part_from_file(StringIO('b' * part_size), 2)
1188 mp.upload_part_from_file(StringIO('c' * part_size), 3)
1189 mp.upload_part_from_file(StringIO('d' * part_size), 4)
1190 mp.complete_upload()
1191
1192 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1193
1194 def test_encrypted_object_sync():
1195 zonegroup = realm.master_zonegroup()
1196 zonegroup_conns = ZonegroupConns(zonegroup)
1197
1198 if len(zonegroup.rw_zones) < 2:
1199 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
1200
1201 (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
1202
1203 # create a bucket on the first zone
1204 bucket_name = gen_bucket_name()
1205 log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
1206 bucket = zone1.conn.create_bucket(bucket_name)
1207
1208 # upload an object with sse-c encryption
1209 sse_c_headers = {
1210 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
1211 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
1212 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
1213 }
1214 key = bucket.new_key('testobj-sse-c')
1215 data = 'A'*512
1216 key.set_contents_from_string(data, headers=sse_c_headers)
1217
1218 # upload an object with sse-kms encryption
1219 sse_kms_headers = {
1220 'x-amz-server-side-encryption': 'aws:kms',
1221 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
1222 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
1223 }
1224 key = bucket.new_key('testobj-sse-kms')
1225 key.set_contents_from_string(data, headers=sse_kms_headers)
1226
1227 # wait for the bucket metadata and data to sync
1228 zonegroup_meta_checkpoint(zonegroup)
1229 zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
1230
1231 # read the encrypted objects from the second zone
1232 bucket2 = get_bucket(zone2, bucket_name)
1233 key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
1234 eq(data, key.get_contents_as_string(headers=sse_c_headers))
1235
1236 key = bucket2.get_key('testobj-sse-kms')
1237 eq(data, key.get_contents_as_string())
1238
1239 def test_bucket_index_log_trim():
1240 zonegroup = realm.master_zonegroup()
1241 zonegroup_conns = ZonegroupConns(zonegroup)
1242
1243 zone = zonegroup_conns.rw_zones[0]
1244
1245 # create a test bucket, upload some objects, and wait for sync
1246 def make_test_bucket():
1247 name = gen_bucket_name()
1248 log.info('create bucket zone=%s name=%s', zone.name, name)
1249 bucket = zone.conn.create_bucket(name)
1250 for objname in ('a', 'b', 'c', 'd'):
1251 k = new_key(zone, name, objname)
1252 k.set_contents_from_string('foo')
1253 zonegroup_meta_checkpoint(zonegroup)
1254 zonegroup_bucket_checkpoint(zonegroup_conns, name)
1255 return bucket
1256
1257 # create a 'cold' bucket
1258 cold_bucket = make_test_bucket()
1259
1260 # trim with max-buckets=0 to clear counters for cold bucket. this should
1261 # prevent it from being considered 'active' by the next autotrim
1262 bilog_autotrim(zone.zone, [
1263 '--rgw-sync-log-trim-max-buckets', '0',
1264 ])
1265
1266 # create an 'active' bucket
1267 active_bucket = make_test_bucket()
1268
1269 # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
1270 bilog_autotrim(zone.zone, [
1271 '--rgw-sync-log-trim-max-buckets', '1',
1272 '--rgw-sync-log-trim-min-cold-buckets', '0',
1273 ])
1274
1275 # verify active bucket has empty bilog
1276 active_bilog = bilog_list(zone.zone, active_bucket.name)
1277 assert(len(active_bilog) == 0)
1278
1279 # verify cold bucket has nonempty bilog
1280 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1281 assert(len(cold_bilog) > 0)
1282
1283 # trim with min-cold-buckets=999 to trim all buckets
1284 bilog_autotrim(zone.zone, [
1285 '--rgw-sync-log-trim-max-buckets', '999',
1286 '--rgw-sync-log-trim-min-cold-buckets', '999',
1287 ])
1288
1289 # verify cold bucket has empty bilog
1290 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1291 assert(len(cold_bilog) == 0)
1292
1293 def test_bucket_creation_time():
1294 zonegroup = realm.master_zonegroup()
1295 zonegroup_conns = ZonegroupConns(zonegroup)
1296
1297 zone_buckets = [zone.get_connection().get_all_buckets() for zone in zonegroup_conns.rw_zones]
1298 for z1, z2 in combinations(zone_buckets, 2):
1299 for a, b in zip(z1, z2):
1300 eq(a.name, b.name)
1301 eq(a.creation_date, b.creation_date)