]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests.py
1 import json
2 import random
3 import string
4 import sys
5 import time
6 import logging
7 import errno
8
9 try:
10 from itertools import izip_longest as zip_longest
11 except ImportError:
12 from itertools import zip_longest
13 from itertools import combinations
14 from six import StringIO
15
16 import boto
17 import boto.s3.connection
18 from boto.s3.website import WebsiteConfiguration
19 from boto.s3.cors import CORSConfiguration
20
21 from nose.tools import eq_ as eq
22 from nose.plugins.attrib import attr
23 from nose.plugins.skip import SkipTest
24
25 from .multisite import Zone, ZoneGroup, Credentials
26
27 from .conn import get_gateway_connection
28 from .tools import assert_raises
29
30 class Config:
31 """ test configuration """
32 def __init__(self, **kwargs):
33 # by default, wait up to 5 minutes before giving up on a sync checkpoint
34 self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
35 self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
36 # allow some time for realm reconfiguration after changing master zone
37 self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
38 self.tenant = kwargs.get('tenant', '')
39
40 # rgw multisite tests, written against the interfaces provided in rgw_multi.
41 # these tests must be initialized and run by another module that provides
42 # implementations of these interfaces by calling init_multi()
43 realm = None
44 user = None
45 config = None
46 def init_multi(_realm, _user, _config=None):
47 global realm
48 realm = _realm
49 global user
50 user = _user
51 global config
52 config = _config or Config()
53 realm_meta_checkpoint(realm)
54
55 def get_user():
56 return user.id if user is not None else ''
57
58 def get_tenant():
59 return config.tenant if config is not None and config.tenant is not None else ''
60
61 def get_realm():
62 return realm
63
64 log = logging.getLogger('rgw_multi.tests')
65
66 num_buckets = 0
67 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
68
69 def get_gateway_connection(gateway, credentials):
70 """ connect to the given gateway """
71 if gateway.connection is None:
72 gateway.connection = boto.connect_s3(
73 aws_access_key_id = credentials.access_key,
74 aws_secret_access_key = credentials.secret,
75 host = gateway.host,
76 port = gateway.port,
77 is_secure = False,
78 calling_format = boto.s3.connection.OrdinaryCallingFormat())
79 return gateway.connection
80
81 def get_zone_connection(zone, credentials):
82 """ connect to the zone's first gateway """
83 if isinstance(credentials, list):
84 credentials = credentials[0]
85 return get_gateway_connection(zone.gateways[0], credentials)
86
87 def mdlog_list(zone, period = None):
88 cmd = ['mdlog', 'list']
89 if period:
90 cmd += ['--period', period]
91 (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
92 mdlog_json = mdlog_json.decode('utf-8')
93 return json.loads(mdlog_json)
94
95 def meta_sync_status(zone):
96 while True:
97 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
98 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
99 if retcode == 0:
100 break
101 assert(retcode == 2) # ENOENT
102 time.sleep(5)
103
104 def mdlog_autotrim(zone):
105 zone.cluster.admin(['mdlog', 'autotrim'])
106
107 def datalog_list(zone, period = None):
108 cmd = ['datalog', 'list']
109 (datalog_json, _) = zone.cluster.admin(cmd, read_only=True)
110 datalog_json = datalog_json.decode('utf-8')
111 return json.loads(datalog_json)
112
113 def datalog_autotrim(zone):
114 zone.cluster.admin(['datalog', 'autotrim'])
115
116 def bilog_list(zone, bucket, args = None):
117 cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
118 cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
119 bilog, _ = zone.cluster.admin(cmd, read_only=True)
120 bilog = bilog.decode('utf-8')
121 return json.loads(bilog)
122
123 def bilog_autotrim(zone, args = None):
124 zone.cluster.admin(['bilog', 'autotrim'] + (args or []))
125
126 def parse_meta_sync_status(meta_sync_status_json):
127 meta_sync_status_json = meta_sync_status_json.decode('utf-8')
128 log.debug('current meta sync status=%s', meta_sync_status_json)
129 sync_status = json.loads(meta_sync_status_json)
130
131 sync_info = sync_status['sync_status']['info']
132 global_sync_status = sync_info['status']
133 num_shards = sync_info['num_shards']
134 period = sync_info['period']
135 realm_epoch = sync_info['realm_epoch']
136
137 sync_markers=sync_status['sync_status']['markers']
138 log.debug('sync_markers=%s', sync_markers)
139 assert(num_shards == len(sync_markers))
140
141 markers={}
142 for i in range(num_shards):
143 # get marker, only if it's an incremental marker for the same realm epoch
144 if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
145 markers[i] = ''
146 else:
147 markers[i] = sync_markers[i]['val']['marker']
148
149 return period, realm_epoch, num_shards, markers
150
151 def meta_sync_status(zone):
152 for _ in range(config.checkpoint_retries):
153 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
154 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
155 if retcode == 0:
156 return parse_meta_sync_status(meta_sync_status_json)
157 assert(retcode == 2) # ENOENT
158 time.sleep(config.checkpoint_delay)
159
160 assert False, 'failed to read metadata sync status for zone=%s' % zone.name
161
162 def meta_master_log_status(master_zone):
163 cmd = ['mdlog', 'status'] + master_zone.zone_args()
164 mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
165 mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
166
167 markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
168 log.debug('master meta markers=%s', markers)
169 return markers
170
171 def compare_meta_status(zone, log_status, sync_status):
172 if len(log_status) != len(sync_status):
173 log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
174 return False
175
176 msg = ''
177 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
178 if l > s:
179 if len(msg):
180 msg += ', '
181 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
182
183 if len(msg) > 0:
184 log.warning('zone %s behind master: %s', zone.name, msg)
185 return False
186
187 return True
188
189 def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
190 if not meta_master_zone:
191 meta_master_zone = zone.realm().meta_master_zone()
192 if not master_status:
193 master_status = meta_master_log_status(meta_master_zone)
194
195 current_realm_epoch = realm.current_period.data['realm_epoch']
196
197 log.info('starting meta checkpoint for zone=%s', zone.name)
198
199 for _ in range(config.checkpoint_retries):
200 period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
201 if realm_epoch < current_realm_epoch:
202 log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
203 zone.name, realm_epoch, current_realm_epoch)
204 else:
205 log.debug('log_status=%s', master_status)
206 log.debug('sync_status=%s', sync_status)
207 if compare_meta_status(zone, master_status, sync_status):
208 log.info('finish meta checkpoint for zone=%s', zone.name)
209 return
210
211 time.sleep(config.checkpoint_delay)
212 assert False, 'failed meta checkpoint for zone=%s' % zone.name
213
214 def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
215 if not meta_master_zone:
216 meta_master_zone = zonegroup.realm().meta_master_zone()
217 if not master_status:
218 master_status = meta_master_log_status(meta_master_zone)
219
220 for zone in zonegroup.zones:
221 if zone == meta_master_zone:
222 continue
223 zone_meta_checkpoint(zone, meta_master_zone, master_status)
224
225 def realm_meta_checkpoint(realm):
226 log.info('meta checkpoint')
227
228 meta_master_zone = realm.meta_master_zone()
229 master_status = meta_master_log_status(meta_master_zone)
230
231 for zonegroup in realm.current_period.zonegroups:
232 zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
233
234 def parse_data_sync_status(data_sync_status_json):
235 data_sync_status_json = data_sync_status_json.decode('utf-8')
236 log.debug('current data sync status=%s', data_sync_status_json)
237 sync_status = json.loads(data_sync_status_json)
238
239 global_sync_status=sync_status['sync_status']['info']['status']
240 num_shards=sync_status['sync_status']['info']['num_shards']
241
242 sync_markers=sync_status['sync_status']['markers']
243 log.debug('sync_markers=%s', sync_markers)
244 assert(num_shards == len(sync_markers))
245
246 markers={}
247 for i in range(num_shards):
248 markers[i] = sync_markers[i]['val']['marker']
249
250 return (num_shards, markers)
251
252 def data_sync_status(target_zone, source_zone):
253 if target_zone == source_zone:
254 return None
255
256 for _ in range(config.checkpoint_retries):
257 cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
258 cmd += ['--source-zone', source_zone.name]
259 data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
260 if retcode == 0:
261 return parse_data_sync_status(data_sync_status_json)
262
263 assert(retcode == 2) # ENOENT
264 time.sleep(config.checkpoint_delay)
265
266 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
267 (target_zone.name, source_zone.name)
268
269 def bucket_sync_status(target_zone, source_zone, bucket_name):
270 if target_zone == source_zone:
271 return None
272
273 cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args()
274 cmd += ['--source-zone', source_zone.name]
275 cmd += ['--bucket', bucket_name]
276 cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
277 while True:
278 bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
279 if retcode == 0:
280 break
281
282 assert(retcode == 2) # ENOENT
283
284 bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
285 log.debug('current bucket sync markers=%s', bucket_sync_status_json)
286 sync_status = json.loads(bucket_sync_status_json)
287
288 markers={}
289 for entry in sync_status:
290 val = entry['val']
291 if val['status'] == 'incremental-sync':
292 pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
293 else:
294 pos = ''
295 markers[entry['key']] = pos
296
297 return markers
298
299 def data_source_log_status(source_zone):
300 source_cluster = source_zone.cluster
301 cmd = ['datalog', 'status'] + source_zone.zone_args()
302 datalog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
303 datalog_status = json.loads(datalog_status_json.decode('utf-8'))
304
305 markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
306 log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
307 return markers
308
309 def bucket_source_log_status(source_zone, bucket_name):
310 cmd = ['bilog', 'status'] + source_zone.zone_args()
311 cmd += ['--bucket', bucket_name]
312 cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
313 source_cluster = source_zone.cluster
314 bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
315 bilog_status = json.loads(bilog_status_json.decode('utf-8'))
316
317 m={}
318 markers={}
319 try:
320 m = bilog_status['markers']
321 except:
322 pass
323
324 for s in m:
325 key = s['key']
326 val = s['val']
327 markers[key] = val
328
329 log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
330 return markers
331
332 def compare_data_status(target_zone, source_zone, log_status, sync_status):
333 if len(log_status) != len(sync_status):
334 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
335 return False
336
337 msg = ''
338 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
339 if l > s:
340 if len(msg):
341 msg += ', '
342 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
343
344 if len(msg) > 0:
345 log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
346 return False
347
348 return True
349
350 def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
351 if len(log_status) != len(sync_status):
352 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
353 return False
354
355 msg = ''
356 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
357 if l > s:
358 if len(msg):
359 msg += ', '
360 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
361
362 if len(msg) > 0:
363 log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
364 return False
365
366 return True
367
368 def zone_data_checkpoint(target_zone, source_zone):
369 if target_zone == source_zone:
370 return
371
372 log_status = data_source_log_status(source_zone)
373 log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
374
375 for _ in range(config.checkpoint_retries):
376 num_shards, sync_status = data_sync_status(target_zone, source_zone)
377
378 log.debug('log_status=%s', log_status)
379 log.debug('sync_status=%s', sync_status)
380
381 if compare_data_status(target_zone, source_zone, log_status, sync_status):
382 log.info('finished data checkpoint for target_zone=%s source_zone=%s',
383 target_zone.name, source_zone.name)
384 return
385 time.sleep(config.checkpoint_delay)
386
387 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
388 (target_zone.name, source_zone.name)
389
390 def zonegroup_data_checkpoint(zonegroup_conns):
391 for source_conn in zonegroup_conns.rw_zones:
392 for target_conn in zonegroup_conns.zones:
393 if source_conn.zone == target_conn.zone:
394 continue
395 log.debug('data checkpoint: source=%s target=%s', source_conn.zone.name, target_conn.zone.name)
396 zone_data_checkpoint(target_conn.zone, source_conn.zone)
397
398 def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
399 if target_zone == source_zone:
400 return
401
402 log_status = bucket_source_log_status(source_zone, bucket_name)
403 log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
404
405 for _ in range(config.checkpoint_retries):
406 sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
407
408 log.debug('log_status=%s', log_status)
409 log.debug('sync_status=%s', sync_status)
410
411 if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
412 log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
413 return
414
415 time.sleep(config.checkpoint_delay)
416
417 assert False, 'failed bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
418 (target_zone.name, source_zone.name, bucket_name)
419
420 def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
421 for source_conn in zonegroup_conns.rw_zones:
422 for target_conn in zonegroup_conns.zones:
423 if source_conn.zone == target_conn.zone:
424 continue
425 log.debug('bucket checkpoint: source=%s target=%s bucket=%s', source_conn.zone.name, target_conn.zone.name, bucket_name)
426 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
427 for source_conn, target_conn in combinations(zonegroup_conns.zones, 2):
428 if target_conn.zone.has_buckets():
429 target_conn.check_bucket_eq(source_conn, bucket_name)
430
431 def set_master_zone(zone):
432 zone.modify(zone.cluster, ['--master'])
433 zonegroup = zone.zonegroup
434 zonegroup.period.update(zone, commit=True)
435 zonegroup.master_zone = zone
436 log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
437 time.sleep(config.reconfigure_delay)
438
439 def set_sync_from_all(zone, flag):
440 s = 'true' if flag else 'false'
441 zone.modify(zone.cluster, ['--sync-from-all={}'.format(s)])
442 zonegroup = zone.zonegroup
443 zonegroup.period.update(zone, commit=True)
444 log.info('Set sync_from_all flag on zone %s to %s', zone.name, s)
445 time.sleep(config.reconfigure_delay)
446
447 def set_redirect_zone(zone, redirect_zone):
448 id_str = redirect_zone.id if redirect_zone else ''
449 zone.modify(zone.cluster, ['--redirect-zone={}'.format(id_str)])
450 zonegroup = zone.zonegroup
451 zonegroup.period.update(zone, commit=True)
452 log.info('Set redirect_zone zone %s to "%s"', zone.name, id_str)
453 time.sleep(config.reconfigure_delay)
454
455 def enable_bucket_sync(zone, bucket_name):
456 cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
457 zone.cluster.admin(cmd)
458
459 def disable_bucket_sync(zone, bucket_name):
460 cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
461 zone.cluster.admin(cmd)
462
463 def check_buckets_sync_status_obj_not_exist(zone, buckets):
464 for _ in range(config.checkpoint_retries):
465 cmd = ['log', 'list'] + zone.zone_arg()
466 log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
467 for bucket in buckets:
468 if log_list.find(':'+bucket+":") >= 0:
469 break
470 else:
471 return
472 time.sleep(config.checkpoint_delay)
473 assert False
474
475 def gen_bucket_name():
476 global num_buckets
477
478 num_buckets += 1
479 return run_prefix + '-' + str(num_buckets)
480
481 class ZonegroupConns:
482 def __init__(self, zonegroup):
483 self.zonegroup = zonegroup
484 self.zones = []
485 self.ro_zones = []
486 self.rw_zones = []
487 self.master_zone = None
488 for z in zonegroup.zones:
489 zone_conn = z.get_conn(user.credentials)
490 self.zones.append(zone_conn)
491 if z.is_read_only():
492 self.ro_zones.append(zone_conn)
493 else:
494 self.rw_zones.append(zone_conn)
495
496 if z == zonegroup.master_zone:
497 self.master_zone = zone_conn
498
499 def check_all_buckets_exist(zone_conn, buckets):
500 if not zone_conn.zone.has_buckets():
501 return True
502
503 for b in buckets:
504 try:
505 zone_conn.get_bucket(b)
506 except:
507 log.critical('zone %s does not contain bucket %s', zone.name, b)
508 return False
509
510 return True
511
512 def check_all_buckets_dont_exist(zone_conn, buckets):
513 if not zone_conn.zone.has_buckets():
514 return True
515
516 for b in buckets:
517 try:
518 zone_conn.get_bucket(b)
519 except:
520 continue
521
522 log.critical('zone %s contains bucket %s', zone.zone, b)
523 return False
524
525 return True
526
527 def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
528 buckets = []
529 zone_bucket = []
530 for zone in zonegroup_conns.rw_zones:
531 for i in xrange(buckets_per_zone):
532 bucket_name = gen_bucket_name()
533 log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
534 bucket = zone.create_bucket(bucket_name)
535 buckets.append(bucket_name)
536 zone_bucket.append((zone, bucket))
537
538 return buckets, zone_bucket
539
540 def create_bucket_per_zone_in_realm():
541 buckets = []
542 zone_bucket = []
543 for zonegroup in realm.current_period.zonegroups:
544 zg_conn = ZonegroupConns(zonegroup)
545 b, z = create_bucket_per_zone(zg_conn)
546 buckets.extend(b)
547 zone_bucket.extend(z)
548 return buckets, zone_bucket
549
550 def test_bucket_create():
551 zonegroup = realm.master_zonegroup()
552 zonegroup_conns = ZonegroupConns(zonegroup)
553 buckets, _ = create_bucket_per_zone(zonegroup_conns)
554 zonegroup_meta_checkpoint(zonegroup)
555
556 for zone in zonegroup_conns.zones:
557 assert check_all_buckets_exist(zone, buckets)
558
559 def test_bucket_recreate():
560 zonegroup = realm.master_zonegroup()
561 zonegroup_conns = ZonegroupConns(zonegroup)
562 buckets, _ = create_bucket_per_zone(zonegroup_conns)
563 zonegroup_meta_checkpoint(zonegroup)
564
565
566 for zone in zonegroup_conns.zones:
567 assert check_all_buckets_exist(zone, buckets)
568
569 # recreate buckets on all zones, make sure they weren't removed
570 for zone in zonegroup_conns.rw_zones:
571 for bucket_name in buckets:
572 bucket = zone.create_bucket(bucket_name)
573
574 for zone in zonegroup_conns.zones:
575 assert check_all_buckets_exist(zone, buckets)
576
577 zonegroup_meta_checkpoint(zonegroup)
578
579 for zone in zonegroup_conns.zones:
580 assert check_all_buckets_exist(zone, buckets)
581
582 def test_bucket_remove():
583 zonegroup = realm.master_zonegroup()
584 zonegroup_conns = ZonegroupConns(zonegroup)
585 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
586 zonegroup_meta_checkpoint(zonegroup)
587
588 for zone in zonegroup_conns.zones:
589 assert check_all_buckets_exist(zone, buckets)
590
591 for zone, bucket_name in zone_bucket:
592 zone.conn.delete_bucket(bucket_name)
593
594 zonegroup_meta_checkpoint(zonegroup)
595
596 for zone in zonegroup_conns.zones:
597 assert check_all_buckets_dont_exist(zone, buckets)
598
599 def get_bucket(zone, bucket_name):
600 return zone.conn.get_bucket(bucket_name)
601
602 def get_key(zone, bucket_name, obj_name):
603 b = get_bucket(zone, bucket_name)
604 return b.get_key(obj_name)
605
606 def new_key(zone, bucket_name, obj_name):
607 b = get_bucket(zone, bucket_name)
608 return b.new_key(obj_name)
609
610 def check_bucket_eq(zone_conn1, zone_conn2, bucket):
611 if zone_conn2.zone.has_buckets():
612 zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
613
614 def test_object_sync():
615 zonegroup = realm.master_zonegroup()
616 zonegroup_conns = ZonegroupConns(zonegroup)
617 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
618
619 objnames = [ 'myobj', '_myobj', ':', '&' ]
620 content = 'asdasd'
621
622 # don't wait for meta sync just yet
623 for zone, bucket_name in zone_bucket:
624 for objname in objnames:
625 k = new_key(zone, bucket_name, objname)
626 k.set_contents_from_string(content)
627
628 zonegroup_meta_checkpoint(zonegroup)
629
630 for source_conn, bucket in zone_bucket:
631 for target_conn in zonegroup_conns.zones:
632 if source_conn.zone == target_conn.zone:
633 continue
634
635 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
636 check_bucket_eq(source_conn, target_conn, bucket)
637
638 def test_object_delete():
639 zonegroup = realm.master_zonegroup()
640 zonegroup_conns = ZonegroupConns(zonegroup)
641 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
642
643 objname = 'myobj'
644 content = 'asdasd'
645
646 # don't wait for meta sync just yet
647 for zone, bucket in zone_bucket:
648 k = new_key(zone, bucket, objname)
649 k.set_contents_from_string(content)
650
651 zonegroup_meta_checkpoint(zonegroup)
652
653 # check object exists
654 for source_conn, bucket in zone_bucket:
655 for target_conn in zonegroup_conns.zones:
656 if source_conn.zone == target_conn.zone:
657 continue
658
659 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
660 check_bucket_eq(source_conn, target_conn, bucket)
661
662 # check object removal
663 for source_conn, bucket in zone_bucket:
664 k = get_key(source_conn, bucket, objname)
665 k.delete()
666 for target_conn in zonegroup_conns.zones:
667 if source_conn.zone == target_conn.zone:
668 continue
669
670 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
671 check_bucket_eq(source_conn, target_conn, bucket)
672
673 def get_latest_object_version(key):
674 for k in key.bucket.list_versions(key.name):
675 if k.is_latest:
676 return k
677 return None
678
679 def test_versioned_object_incremental_sync():
680 zonegroup = realm.master_zonegroup()
681 zonegroup_conns = ZonegroupConns(zonegroup)
682 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
683
684 # enable versioning
685 for _, bucket in zone_bucket:
686 bucket.configure_versioning(True)
687
688 zonegroup_meta_checkpoint(zonegroup)
689
690 # upload a dummy object to each bucket and wait for sync. this forces each
691 # bucket to finish a full sync and switch to incremental
692 for source_conn, bucket in zone_bucket:
693 new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
694 for target_conn in zonegroup_conns.zones:
695 if source_conn.zone == target_conn.zone:
696 continue
697 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
698
699 for _, bucket in zone_bucket:
700 # create and delete multiple versions of an object from each zone
701 for zone_conn in zonegroup_conns.rw_zones:
702 obj = 'obj-' + zone_conn.name
703 k = new_key(zone_conn, bucket, obj)
704
705 k.set_contents_from_string('version1')
706 log.debug('version1 id=%s', k.version_id)
707 # don't delete version1 - this tests that the initial version
708 # doesn't get squashed into later versions
709
710 # create and delete the following object versions to test that
711 # the operations don't race with each other during sync
712 k.set_contents_from_string('version2')
713 log.debug('version2 id=%s', k.version_id)
714 k.bucket.delete_key(obj, version_id=k.version_id)
715
716 k.set_contents_from_string('version3')
717 log.debug('version3 id=%s', k.version_id)
718 k.bucket.delete_key(obj, version_id=k.version_id)
719
720 for _, bucket in zone_bucket:
721 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
722
723 for _, bucket in zone_bucket:
724 # overwrite the acls to test that metadata-only entries are applied
725 for zone_conn in zonegroup_conns.rw_zones:
726 obj = 'obj-' + zone_conn.name
727 k = new_key(zone_conn, bucket.name, obj)
728 v = get_latest_object_version(k)
729 v.make_public()
730
731 for _, bucket in zone_bucket:
732 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
733
734 def test_concurrent_versioned_object_incremental_sync():
735 zonegroup = realm.master_zonegroup()
736 zonegroup_conns = ZonegroupConns(zonegroup)
737 zone = zonegroup_conns.rw_zones[0]
738
739 # create a versioned bucket
740 bucket = zone.create_bucket(gen_bucket_name())
741 log.debug('created bucket=%s', bucket.name)
742 bucket.configure_versioning(True)
743
744 zonegroup_meta_checkpoint(zonegroup)
745
746 # upload a dummy object and wait for sync. this forces each zone to finish
747 # a full sync and switch to incremental
748 new_key(zone, bucket, 'dummy').set_contents_from_string('')
749 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
750
751 # create several concurrent versions on each zone and let them race to sync
752 obj = 'obj'
753 for i in range(10):
754 for zone_conn in zonegroup_conns.rw_zones:
755 k = new_key(zone_conn, bucket, obj)
756 k.set_contents_from_string('version1')
757 log.debug('zone=%s version=%s', zone_conn.zone.name, k.version_id)
758
759 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
760 zonegroup_data_checkpoint(zonegroup_conns)
761
762 def test_version_suspended_incremental_sync():
763 zonegroup = realm.master_zonegroup()
764 zonegroup_conns = ZonegroupConns(zonegroup)
765
766 zone = zonegroup_conns.rw_zones[0]
767
768 # create a non-versioned bucket
769 bucket = zone.create_bucket(gen_bucket_name())
770 log.debug('created bucket=%s', bucket.name)
771 zonegroup_meta_checkpoint(zonegroup)
772
773 # upload an initial object
774 key1 = new_key(zone, bucket, 'obj')
775 key1.set_contents_from_string('')
776 log.debug('created initial version id=%s', key1.version_id)
777 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
778
779 # enable versioning
780 bucket.configure_versioning(True)
781 zonegroup_meta_checkpoint(zonegroup)
782
783 # re-upload the object as a new version
784 key2 = new_key(zone, bucket, 'obj')
785 key2.set_contents_from_string('')
786 log.debug('created new version id=%s', key2.version_id)
787 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
788
789 # suspend versioning
790 bucket.configure_versioning(False)
791 zonegroup_meta_checkpoint(zonegroup)
792
793 # re-upload the object as a 'null' version
794 key3 = new_key(zone, bucket, 'obj')
795 key3.set_contents_from_string('')
796 log.debug('created null version id=%s', key3.version_id)
797 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
798
799 def test_delete_marker_full_sync():
800 zonegroup = realm.master_zonegroup()
801 zonegroup_conns = ZonegroupConns(zonegroup)
802 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
803
804 # enable versioning
805 for _, bucket in zone_bucket:
806 bucket.configure_versioning(True)
807 zonegroup_meta_checkpoint(zonegroup)
808
809 for zone, bucket in zone_bucket:
810 # upload an initial object
811 key1 = new_key(zone, bucket, 'obj')
812 key1.set_contents_from_string('')
813
814 # create a delete marker
815 key2 = new_key(zone, bucket, 'obj')
816 key2.delete()
817
818 # wait for full sync
819 for _, bucket in zone_bucket:
820 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
821
822 def test_suspended_delete_marker_full_sync():
823 zonegroup = realm.master_zonegroup()
824 zonegroup_conns = ZonegroupConns(zonegroup)
825 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
826
827 # enable/suspend versioning
828 for _, bucket in zone_bucket:
829 bucket.configure_versioning(True)
830 bucket.configure_versioning(False)
831 zonegroup_meta_checkpoint(zonegroup)
832
833 for zone, bucket in zone_bucket:
834 # upload an initial object
835 key1 = new_key(zone, bucket, 'obj')
836 key1.set_contents_from_string('')
837
838 # create a delete marker
839 key2 = new_key(zone, bucket, 'obj')
840 key2.delete()
841
842 # wait for full sync
843 for _, bucket in zone_bucket:
844 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
845
846 def test_bucket_versioning():
847 buckets, zone_bucket = create_bucket_per_zone_in_realm()
848 for _, bucket in zone_bucket:
849 bucket.configure_versioning(True)
850 res = bucket.get_versioning_status()
851 key = 'Versioning'
852 assert(key in res and res[key] == 'Enabled')
853
854 def test_bucket_acl():
855 buckets, zone_bucket = create_bucket_per_zone_in_realm()
856 for _, bucket in zone_bucket:
857 assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
858 bucket.set_acl('public-read')
859 assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
860
861 def test_bucket_cors():
862 buckets, zone_bucket = create_bucket_per_zone_in_realm()
863 for _, bucket in zone_bucket:
864 cors_cfg = CORSConfiguration()
865 cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
866 bucket.set_cors(cors_cfg)
867 assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
868
869 def test_bucket_delete_notempty():
870 zonegroup = realm.master_zonegroup()
871 zonegroup_conns = ZonegroupConns(zonegroup)
872 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
873 zonegroup_meta_checkpoint(zonegroup)
874
875 for zone_conn, bucket_name in zone_bucket:
876 # upload an object to each bucket on its own zone
877 conn = zone_conn.get_connection()
878 bucket = conn.get_bucket(bucket_name)
879 k = bucket.new_key('foo')
880 k.set_contents_from_string('bar')
881 # attempt to delete the bucket before this object can sync
882 try:
883 conn.delete_bucket(bucket_name)
884 except boto.exception.S3ResponseError as e:
885 assert(e.error_code == 'BucketNotEmpty')
886 continue
887 assert False # expected 409 BucketNotEmpty
888
889 # assert that each bucket still exists on the master
890 c1 = zonegroup_conns.master_zone.conn
891 for _, bucket_name in zone_bucket:
892 assert c1.get_bucket(bucket_name)
893
894 def test_multi_period_incremental_sync():
895 zonegroup = realm.master_zonegroup()
896 if len(zonegroup.zones) < 3:
897 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
898
899 # periods to include in mdlog comparison
900 mdlog_periods = [realm.current_period.id]
901
902 # create a bucket in each zone
903 zonegroup_conns = ZonegroupConns(zonegroup)
904 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
905
906 zonegroup_meta_checkpoint(zonegroup)
907
908 z1, z2, z3 = zonegroup.zones[0:3]
909 assert(z1 == zonegroup.master_zone)
910
911 # kill zone 3 gateways to freeze sync status to incremental in first period
912 z3.stop()
913
914 # change master to zone 2 -> period 2
915 set_master_zone(z2)
916 mdlog_periods += [realm.current_period.id]
917
918 for zone_conn, _ in zone_bucket:
919 if zone_conn.zone == z3:
920 continue
921 bucket_name = gen_bucket_name()
922 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
923 bucket = zone_conn.conn.create_bucket(bucket_name)
924 buckets.append(bucket_name)
925
926 # wait for zone 1 to sync
927 zone_meta_checkpoint(z1)
928
929 # change master back to zone 1 -> period 3
930 set_master_zone(z1)
931 mdlog_periods += [realm.current_period.id]
932
933 for zone_conn, bucket_name in zone_bucket:
934 if zone_conn.zone == z3:
935 continue
936 bucket_name = gen_bucket_name()
937 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
938 zone_conn.conn.create_bucket(bucket_name)
939 buckets.append(bucket_name)
940
941 # restart zone 3 gateway and wait for sync
942 z3.start()
943 zonegroup_meta_checkpoint(zonegroup)
944
945 # verify that we end up with the same objects
946 for bucket_name in buckets:
947 for source_conn, _ in zone_bucket:
948 for target_conn in zonegroup_conns.zones:
949 if source_conn.zone == target_conn.zone:
950 continue
951
952 if target_conn.zone.has_buckets():
953 target_conn.check_bucket_eq(source_conn, bucket_name)
954
955 # verify that mdlogs are not empty and match for each period
956 for period in mdlog_periods:
957 master_mdlog = mdlog_list(z1, period)
958 assert len(master_mdlog) > 0
959 for zone in zonegroup.zones:
960 if zone == z1:
961 continue
962 mdlog = mdlog_list(zone, period)
963 assert len(mdlog) == len(master_mdlog)
964
965 # autotrim mdlogs for master zone
966 mdlog_autotrim(z1)
967
968 # autotrim mdlogs for peers
969 for zone in zonegroup.zones:
970 if zone == z1:
971 continue
972 mdlog_autotrim(zone)
973
974 # verify that mdlogs are empty for each period
975 for period in mdlog_periods:
976 for zone in zonegroup.zones:
977 mdlog = mdlog_list(zone, period)
978 assert len(mdlog) == 0
979
980 def test_datalog_autotrim():
981 zonegroup = realm.master_zonegroup()
982 zonegroup_conns = ZonegroupConns(zonegroup)
983 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
984
985 # upload an object to each zone to generate a datalog entry
986 for zone, bucket in zone_bucket:
987 k = new_key(zone, bucket.name, 'key')
988 k.set_contents_from_string('body')
989
990 # wait for data sync to catch up
991 zonegroup_data_checkpoint(zonegroup_conns)
992
993 # trim each datalog
994 for zone, _ in zone_bucket:
995 datalog_autotrim(zone.zone)
996 datalog = datalog_list(zone.zone)
997 assert len(datalog) == 0
998
999 def test_multi_zone_redirect():
1000 zonegroup = realm.master_zonegroup()
1001 if len(zonegroup.rw_zones) < 2:
1002 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
1003
1004 zonegroup_conns = ZonegroupConns(zonegroup)
1005 (zc1, zc2) = zonegroup_conns.rw_zones[0:2]
1006
1007 z1, z2 = (zc1.zone, zc2.zone)
1008
1009 set_sync_from_all(z2, False)
1010
1011 # create a bucket on the first zone
1012 bucket_name = gen_bucket_name()
1013 log.info('create bucket zone=%s name=%s', z1.name, bucket_name)
1014 bucket = zc1.conn.create_bucket(bucket_name)
1015 obj = 'testredirect'
1016
1017 key = bucket.new_key(obj)
1018 data = 'A'*512
1019 key.set_contents_from_string(data)
1020
1021 zonegroup_meta_checkpoint(zonegroup)
1022
1023 # try to read object from second zone (should fail)
1024 bucket2 = get_bucket(zc2, bucket_name)
1025 assert_raises(boto.exception.S3ResponseError, bucket2.get_key, obj)
1026
1027 set_redirect_zone(z2, z1)
1028
1029 key2 = bucket2.get_key(obj)
1030
1031 eq(data, key2.get_contents_as_string())
1032
1033 key = bucket.new_key(obj)
1034
1035 for x in ['a', 'b', 'c', 'd']:
1036 data = x*512
1037 key.set_contents_from_string(data)
1038 eq(data, key2.get_contents_as_string())
1039
1040 # revert config changes
1041 set_sync_from_all(z2, True)
1042 set_redirect_zone(z2, None)
1043
1044 def test_zonegroup_remove():
1045 zonegroup = realm.master_zonegroup()
1046 zonegroup_conns = ZonegroupConns(zonegroup)
1047 if len(zonegroup.zones) < 2:
1048 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
1049
1050 zonegroup_meta_checkpoint(zonegroup)
1051 z1, z2 = zonegroup.zones[0:2]
1052 c1, c2 = (z1.cluster, z2.cluster)
1053
1054 # get admin credentials out of existing zone
1055 system_key = z1.data['system_key']
1056 admin_creds = Credentials(system_key['access_key'], system_key['secret_key'])
1057
1058 # create a new zone in zonegroup on c2 and commit
1059 zone = Zone('remove', zonegroup, c2)
1060 zone.create(c2, admin_creds.credential_args())
1061 zonegroup.zones.append(zone)
1062 zonegroup.period.update(zone, commit=True)
1063
1064 zonegroup.remove(c1, zone)
1065
1066 # another 'zonegroup remove' should fail with ENOENT
1067 _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
1068 assert(retcode == 2) # ENOENT
1069
1070 # delete the new zone
1071 zone.delete(c2)
1072
1073 # validate the resulting period
1074 zonegroup.period.update(z1, commit=True)
1075
1076
1077 def test_zg_master_zone_delete():
1078
1079 master_zg = realm.master_zonegroup()
1080 master_zone = master_zg.master_zone
1081
1082 assert(len(master_zg.zones) >= 1)
1083 master_cluster = master_zg.zones[0].cluster
1084
1085 rm_zg = ZoneGroup('remove_zg')
1086 rm_zg.create(master_cluster)
1087
1088 rm_zone = Zone('remove', rm_zg, master_cluster)
1089 rm_zone.create(master_cluster)
1090 master_zg.period.update(master_zone, commit=True)
1091
1092
1093 rm_zone.delete(master_cluster)
1094 # Period update: This should now fail as the zone will be the master zone
1095 # in that zg
1096 _, retcode = master_zg.period.update(master_zone, check_retcode=False)
1097 assert(retcode == errno.EINVAL)
1098
1099 # Proceed to delete the zonegroup as well, previous period now does not
1100 # contain a dangling master_zone, this must succeed
1101 rm_zg.delete(master_cluster)
1102 master_zg.period.update(master_zone, commit=True)
1103
1104 def test_set_bucket_website():
1105 buckets, zone_bucket = create_bucket_per_zone_in_realm()
1106 for _, bucket in zone_bucket:
1107 website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
1108 try:
1109 bucket.set_website_configuration(website_cfg)
1110 except boto.exception.S3ResponseError as e:
1111 if e.error_code == 'MethodNotAllowed':
1112 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
1113 assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
1114
1115 def test_set_bucket_policy():
1116 policy = '''{
1117 "Version": "2012-10-17",
1118 "Statement": [{
1119 "Effect": "Allow",
1120 "Principal": "*"
1121 }]
1122 }'''
1123 buckets, zone_bucket = create_bucket_per_zone_in_realm()
1124 for _, bucket in zone_bucket:
1125 bucket.set_policy(policy)
1126 assert(bucket.get_policy() == policy)
1127
1128 def test_bucket_sync_disable():
1129 zonegroup = realm.master_zonegroup()
1130 zonegroup_conns = ZonegroupConns(zonegroup)
1131 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1132
1133 for bucket_name in buckets:
1134 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
1135
1136 for zone in zonegroup.zones:
1137 check_buckets_sync_status_obj_not_exist(zone, buckets)
1138
1139 zonegroup_data_checkpoint(zonegroup_conns)
1140
1141 def test_bucket_sync_enable_right_after_disable():
1142 zonegroup = realm.master_zonegroup()
1143 zonegroup_conns = ZonegroupConns(zonegroup)
1144 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1145
1146 objnames = ['obj1', 'obj2', 'obj3', 'obj4']
1147 content = 'asdasd'
1148
1149 for zone, bucket in zone_bucket:
1150 for objname in objnames:
1151 k = new_key(zone, bucket.name, objname)
1152 k.set_contents_from_string(content)
1153
1154 for bucket_name in buckets:
1155 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1156
1157 for bucket_name in buckets:
1158 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
1159 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
1160
1161 objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
1162
1163 for zone, bucket in zone_bucket:
1164 for objname in objnames_2:
1165 k = new_key(zone, bucket.name, objname)
1166 k.set_contents_from_string(content)
1167
1168 for bucket_name in buckets:
1169 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1170
1171 zonegroup_data_checkpoint(zonegroup_conns)
1172
1173 def test_bucket_sync_disable_enable():
1174 zonegroup = realm.master_zonegroup()
1175 zonegroup_conns = ZonegroupConns(zonegroup)
1176 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1177
1178 objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
1179 content = 'asdasd'
1180
1181 for zone, bucket in zone_bucket:
1182 for objname in objnames:
1183 k = new_key(zone, bucket.name, objname)
1184 k.set_contents_from_string(content)
1185
1186 zonegroup_meta_checkpoint(zonegroup)
1187
1188 for bucket_name in buckets:
1189 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1190
1191 for bucket_name in buckets:
1192 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
1193
1194 zonegroup_meta_checkpoint(zonegroup)
1195
1196 objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
1197
1198 for zone, bucket in zone_bucket:
1199 for objname in objnames_2:
1200 k = new_key(zone, bucket.name, objname)
1201 k.set_contents_from_string(content)
1202
1203 for bucket_name in buckets:
1204 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
1205
1206 for bucket_name in buckets:
1207 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1208
1209 zonegroup_data_checkpoint(zonegroup_conns)
1210
1211 def test_multipart_object_sync():
1212 zonegroup = realm.master_zonegroup()
1213 zonegroup_conns = ZonegroupConns(zonegroup)
1214 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
1215
1216 _, bucket = zone_bucket[0]
1217
1218 # initiate a multipart upload
1219 upload = bucket.initiate_multipart_upload('MULTIPART')
1220 mp = boto.s3.multipart.MultiPartUpload(bucket)
1221 mp.key_name = upload.key_name
1222 mp.id = upload.id
1223 part_size = 5 * 1024 * 1024 # 5M min part size
1224 mp.upload_part_from_file(StringIO('a' * part_size), 1)
1225 mp.upload_part_from_file(StringIO('b' * part_size), 2)
1226 mp.upload_part_from_file(StringIO('c' * part_size), 3)
1227 mp.upload_part_from_file(StringIO('d' * part_size), 4)
1228 mp.complete_upload()
1229
1230 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
1231
1232 def test_encrypted_object_sync():
1233 zonegroup = realm.master_zonegroup()
1234 zonegroup_conns = ZonegroupConns(zonegroup)
1235
1236 if len(zonegroup.rw_zones) < 2:
1237 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
1238
1239 (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
1240
1241 # create a bucket on the first zone
1242 bucket_name = gen_bucket_name()
1243 log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
1244 bucket = zone1.conn.create_bucket(bucket_name)
1245
1246 # upload an object with sse-c encryption
1247 sse_c_headers = {
1248 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
1249 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
1250 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
1251 }
1252 key = bucket.new_key('testobj-sse-c')
1253 data = 'A'*512
1254 key.set_contents_from_string(data, headers=sse_c_headers)
1255
1256 # upload an object with sse-kms encryption
1257 sse_kms_headers = {
1258 'x-amz-server-side-encryption': 'aws:kms',
1259 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
1260 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
1261 }
1262 key = bucket.new_key('testobj-sse-kms')
1263 key.set_contents_from_string(data, headers=sse_kms_headers)
1264
1265 # wait for the bucket metadata and data to sync
1266 zonegroup_meta_checkpoint(zonegroup)
1267 zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
1268
1269 # read the encrypted objects from the second zone
1270 bucket2 = get_bucket(zone2, bucket_name)
1271 key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
1272 eq(data, key.get_contents_as_string(headers=sse_c_headers))
1273
1274 key = bucket2.get_key('testobj-sse-kms')
1275 eq(data, key.get_contents_as_string())
1276
1277 def test_bucket_index_log_trim():
1278 zonegroup = realm.master_zonegroup()
1279 zonegroup_conns = ZonegroupConns(zonegroup)
1280
1281 zone = zonegroup_conns.rw_zones[0]
1282
1283 # create a test bucket, upload some objects, and wait for sync
1284 def make_test_bucket():
1285 name = gen_bucket_name()
1286 log.info('create bucket zone=%s name=%s', zone.name, name)
1287 bucket = zone.conn.create_bucket(name)
1288 for objname in ('a', 'b', 'c', 'd'):
1289 k = new_key(zone, name, objname)
1290 k.set_contents_from_string('foo')
1291 zonegroup_meta_checkpoint(zonegroup)
1292 zonegroup_bucket_checkpoint(zonegroup_conns, name)
1293 return bucket
1294
1295 # create a 'cold' bucket
1296 cold_bucket = make_test_bucket()
1297
1298 # trim with max-buckets=0 to clear counters for cold bucket. this should
1299 # prevent it from being considered 'active' by the next autotrim
1300 bilog_autotrim(zone.zone, [
1301 '--rgw-sync-log-trim-max-buckets', '0',
1302 ])
1303
1304 # create an 'active' bucket
1305 active_bucket = make_test_bucket()
1306
1307 # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
1308 bilog_autotrim(zone.zone, [
1309 '--rgw-sync-log-trim-max-buckets', '1',
1310 '--rgw-sync-log-trim-min-cold-buckets', '0',
1311 ])
1312
1313 # verify active bucket has empty bilog
1314 active_bilog = bilog_list(zone.zone, active_bucket.name)
1315 assert(len(active_bilog) == 0)
1316
1317 # verify cold bucket has nonempty bilog
1318 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1319 assert(len(cold_bilog) > 0)
1320
1321 # trim with min-cold-buckets=999 to trim all buckets
1322 bilog_autotrim(zone.zone, [
1323 '--rgw-sync-log-trim-max-buckets', '999',
1324 '--rgw-sync-log-trim-min-cold-buckets', '999',
1325 ])
1326
1327 # verify cold bucket has empty bilog
1328 cold_bilog = bilog_list(zone.zone, cold_bucket.name)
1329 assert(len(cold_bilog) == 0)
1330
1331 def test_bucket_creation_time():
1332 zonegroup = realm.master_zonegroup()
1333 zonegroup_conns = ZonegroupConns(zonegroup)
1334
1335 zone_buckets = [zone.get_connection().get_all_buckets() for zone in zonegroup_conns.rw_zones]
1336 for z1, z2 in combinations(zone_buckets, 2):
1337 for a, b in zip(z1, z2):
1338 eq(a.name, b.name)
1339 eq(a.creation_date, b.creation_date)