]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests.py
update sources to v12.2.1
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests.py
1 import json
2 import random
3 import string
4 import sys
5 import time
6 import logging
7
8 try:
9 from itertools import izip_longest as zip_longest
10 except ImportError:
11 from itertools import zip_longest
12 from itertools import combinations
13
14 import boto
15 import boto.s3.connection
16 from boto.s3.website import WebsiteConfiguration
17 from boto.s3.cors import CORSConfiguration
18
19 from nose.tools import eq_ as eq
20 from nose.plugins.attrib import attr
21 from nose.plugins.skip import SkipTest
22
23 from .multisite import Zone
24
25 from .conn import get_gateway_connection
26
27 class Config:
28 """ test configuration """
29 def __init__(self, **kwargs):
30 # by default, wait up to 5 minutes before giving up on a sync checkpoint
31 self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
32 self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
33 # allow some time for realm reconfiguration after changing master zone
34 self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
35
36 # rgw multisite tests, written against the interfaces provided in rgw_multi.
37 # these tests must be initialized and run by another module that provides
38 # implementations of these interfaces by calling init_multi()
39 realm = None
40 user = None
41 config = None
42 def init_multi(_realm, _user, _config=None):
43 global realm
44 realm = _realm
45 global user
46 user = _user
47 global config
48 config = _config or Config()
49 realm_meta_checkpoint(realm)
50
51 def get_realm():
52 return realm
53
54 log = logging.getLogger(__name__)
55
56 num_buckets = 0
57 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
58
59 def get_gateway_connection(gateway, credentials):
60 """ connect to the given gateway """
61 if gateway.connection is None:
62 gateway.connection = boto.connect_s3(
63 aws_access_key_id = credentials.access_key,
64 aws_secret_access_key = credentials.secret,
65 host = gateway.host,
66 port = gateway.port,
67 is_secure = False,
68 calling_format = boto.s3.connection.OrdinaryCallingFormat())
69 return gateway.connection
70
71 def get_zone_connection(zone, credentials):
72 """ connect to the zone's first gateway """
73 if isinstance(credentials, list):
74 credentials = credentials[0]
75 return get_gateway_connection(zone.gateways[0], credentials)
76
77 def mdlog_list(zone, period = None):
78 cmd = ['mdlog', 'list']
79 if period:
80 cmd += ['--period', period]
81 (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
82 mdlog_json = mdlog_json.decode('utf-8')
83 return json.loads(mdlog_json)
84
85 def meta_sync_status(zone):
86 while True:
87 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
88 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
89 if retcode == 0:
90 break
91 assert(retcode == 2) # ENOENT
92 time.sleep(5)
93
94 def mdlog_autotrim(zone):
95 zone.cluster.admin(['mdlog', 'autotrim'])
96
97 def parse_meta_sync_status(meta_sync_status_json):
98 meta_sync_status_json = meta_sync_status_json.decode('utf-8')
99 log.debug('current meta sync status=%s', meta_sync_status_json)
100 sync_status = json.loads(meta_sync_status_json)
101
102 sync_info = sync_status['sync_status']['info']
103 global_sync_status = sync_info['status']
104 num_shards = sync_info['num_shards']
105 period = sync_info['period']
106 realm_epoch = sync_info['realm_epoch']
107
108 sync_markers=sync_status['sync_status']['markers']
109 log.debug('sync_markers=%s', sync_markers)
110 assert(num_shards == len(sync_markers))
111
112 markers={}
113 for i in range(num_shards):
114 # get marker, only if it's an incremental marker for the same realm epoch
115 if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
116 markers[i] = ''
117 else:
118 markers[i] = sync_markers[i]['val']['marker']
119
120 return period, realm_epoch, num_shards, markers
121
122 def meta_sync_status(zone):
123 for _ in range(config.checkpoint_retries):
124 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
125 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
126 if retcode == 0:
127 return parse_meta_sync_status(meta_sync_status_json)
128 assert(retcode == 2) # ENOENT
129 time.sleep(config.checkpoint_delay)
130
131 assert False, 'failed to read metadata sync status for zone=%s' % zone.name
132
133 def meta_master_log_status(master_zone):
134 cmd = ['mdlog', 'status'] + master_zone.zone_args()
135 mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
136 mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
137
138 markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
139 log.debug('master meta markers=%s', markers)
140 return markers
141
142 def compare_meta_status(zone, log_status, sync_status):
143 if len(log_status) != len(sync_status):
144 log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
145 return False
146
147 msg = ''
148 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
149 if l > s:
150 if len(msg):
151 msg += ', '
152 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
153
154 if len(msg) > 0:
155 log.warning('zone %s behind master: %s', zone.name, msg)
156 return False
157
158 return True
159
160 def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
161 if not meta_master_zone:
162 meta_master_zone = zone.realm().meta_master_zone()
163 if not master_status:
164 master_status = meta_master_log_status(meta_master_zone)
165
166 current_realm_epoch = realm.current_period.data['realm_epoch']
167
168 log.info('starting meta checkpoint for zone=%s', zone.name)
169
170 for _ in range(config.checkpoint_retries):
171 period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
172 if realm_epoch < current_realm_epoch:
173 log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
174 zone.name, realm_epoch, current_realm_epoch)
175 else:
176 log.debug('log_status=%s', master_status)
177 log.debug('sync_status=%s', sync_status)
178 if compare_meta_status(zone, master_status, sync_status):
179 log.info('finish meta checkpoint for zone=%s', zone.name)
180 return
181
182 time.sleep(config.checkpoint_delay)
183 assert False, 'failed meta checkpoint for zone=%s' % zone.name
184
185 def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
186 if not meta_master_zone:
187 meta_master_zone = zonegroup.realm().meta_master_zone()
188 if not master_status:
189 master_status = meta_master_log_status(meta_master_zone)
190
191 for zone in zonegroup.zones:
192 if zone == meta_master_zone:
193 continue
194 zone_meta_checkpoint(zone, meta_master_zone, master_status)
195
196 def realm_meta_checkpoint(realm):
197 log.info('meta checkpoint')
198
199 meta_master_zone = realm.meta_master_zone()
200 master_status = meta_master_log_status(meta_master_zone)
201
202 for zonegroup in realm.current_period.zonegroups:
203 zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
204
205 def parse_data_sync_status(data_sync_status_json):
206 data_sync_status_json = data_sync_status_json.decode('utf-8')
207 log.debug('current data sync status=%s', data_sync_status_json)
208 sync_status = json.loads(data_sync_status_json)
209
210 global_sync_status=sync_status['sync_status']['info']['status']
211 num_shards=sync_status['sync_status']['info']['num_shards']
212
213 sync_markers=sync_status['sync_status']['markers']
214 log.debug('sync_markers=%s', sync_markers)
215 assert(num_shards == len(sync_markers))
216
217 markers={}
218 for i in range(num_shards):
219 markers[i] = sync_markers[i]['val']['marker']
220
221 return (num_shards, markers)
222
223 def data_sync_status(target_zone, source_zone):
224 if target_zone == source_zone:
225 return None
226
227 for _ in range(config.checkpoint_retries):
228 cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
229 cmd += ['--source-zone', source_zone.name]
230 data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
231 if retcode == 0:
232 return parse_data_sync_status(data_sync_status_json)
233
234 assert(retcode == 2) # ENOENT
235 time.sleep(config.checkpoint_delay)
236
237 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
238 (target_zone.name, source_zone.name)
239
240 def bucket_sync_status(target_zone, source_zone, bucket_name):
241 if target_zone == source_zone:
242 return None
243
244 cmd = ['bucket', 'sync', 'status'] + target_zone.zone_args()
245 cmd += ['--source-zone', source_zone.name]
246 cmd += ['--bucket', bucket_name]
247 while True:
248 bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
249 if retcode == 0:
250 break
251
252 assert(retcode == 2) # ENOENT
253
254 bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
255 log.debug('current bucket sync status=%s', bucket_sync_status_json)
256 sync_status = json.loads(bucket_sync_status_json)
257
258 markers={}
259 for entry in sync_status:
260 val = entry['val']
261 if val['status'] == 'incremental-sync':
262 pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
263 else:
264 pos = ''
265 markers[entry['key']] = pos
266
267 return markers
268
269 def data_source_log_status(source_zone):
270 source_cluster = source_zone.cluster
271 cmd = ['datalog', 'status'] + source_zone.zone_args()
272 datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True)
273 datalog_status = json.loads(datalog_status_json.decode('utf-8'))
274
275 markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
276 log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
277 return markers
278
279 def bucket_source_log_status(source_zone, bucket_name):
280 cmd = ['bilog', 'status'] + source_zone.zone_args()
281 cmd += ['--bucket', bucket_name]
282 source_cluster = source_zone.cluster
283 bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
284 bilog_status = json.loads(bilog_status_json.decode('utf-8'))
285
286 m={}
287 markers={}
288 try:
289 m = bilog_status['markers']
290 except:
291 pass
292
293 for s in m:
294 key = s['key']
295 val = s['val']
296 markers[key] = val
297
298 log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
299 return markers
300
301 def compare_data_status(target_zone, source_zone, log_status, sync_status):
302 if len(log_status) != len(sync_status):
303 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
304 return False
305
306 msg = ''
307 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
308 if l > s:
309 if len(msg):
310 msg += ', '
311 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
312
313 if len(msg) > 0:
314 log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
315 return False
316
317 return True
318
319 def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
320 if len(log_status) != len(sync_status):
321 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
322 return False
323
324 msg = ''
325 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
326 if l > s:
327 if len(msg):
328 msg += ', '
329 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
330
331 if len(msg) > 0:
332 log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
333 return False
334
335 return True
336
337 def zone_data_checkpoint(target_zone, source_zone_conn):
338 if target_zone == source_zone:
339 return
340
341 log_status = data_source_log_status(source_zone)
342 log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
343
344 for _ in range(config.checkpoint_retries):
345 num_shards, sync_status = data_sync_status(target_zone, source_zone)
346
347 log.debug('log_status=%s', log_status)
348 log.debug('sync_status=%s', sync_status)
349
350 if compare_data_status(target_zone, source_zone, log_status, sync_status):
351 log.info('finished data checkpoint for target_zone=%s source_zone=%s',
352 target_zone.name, source_zone.name)
353 return
354 time.sleep(config.checkpoint_delay)
355
356 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
357 (target_zone.name, source_zone.name)
358
359
360 def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
361 if target_zone == source_zone:
362 return
363
364 log_status = bucket_source_log_status(source_zone, bucket_name)
365 log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
366
367 for _ in range(config.checkpoint_retries):
368 sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
369
370 log.debug('log_status=%s', log_status)
371 log.debug('sync_status=%s', sync_status)
372
373 if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
374 log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
375 return
376
377 time.sleep(config.checkpoint_delay)
378
379 assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
380 (target_zone.name, source_zone.name, bucket_name)
381
382 def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
383 for source_conn in zonegroup_conns.zones:
384 for target_conn in zonegroup_conns.zones:
385 if source_conn.zone == target_conn.zone:
386 continue
387 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
388 target_conn.check_bucket_eq(source_conn, bucket_name)
389
390 def set_master_zone(zone):
391 zone.modify(zone.cluster, ['--master'])
392 zonegroup = zone.zonegroup
393 zonegroup.period.update(zone, commit=True)
394 zonegroup.master_zone = zone
395 log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
396 time.sleep(config.reconfigure_delay)
397
398 def enable_bucket_sync(zone, bucket_name):
399 cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
400 zone.cluster.admin(cmd)
401
402 def disable_bucket_sync(zone, bucket_name):
403 cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
404 zone.cluster.admin(cmd)
405
406 def check_buckets_sync_status_obj_not_exist(zone, buckets):
407 for _ in range(config.checkpoint_retries):
408 cmd = ['log', 'list'] + zone.zone_arg()
409 log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
410 for bucket in buckets:
411 if log_list.find(':'+bucket+":") >= 0:
412 break
413 else:
414 return
415 time.sleep(config.checkpoint_delay)
416 assert False
417
418 def gen_bucket_name():
419 global num_buckets
420
421 num_buckets += 1
422 return run_prefix + '-' + str(num_buckets)
423
424 class ZonegroupConns:
425 def __init__(self, zonegroup):
426 self.zonegroup = zonegroup
427 self.zones = []
428 self.ro_zones = []
429 self.rw_zones = []
430 self.master_zone = None
431 for z in zonegroup.zones:
432 zone_conn = z.get_conn(user.credentials)
433 self.zones.append(zone_conn)
434 if z.is_read_only():
435 self.ro_zones.append(zone_conn)
436 else:
437 self.rw_zones.append(zone_conn)
438
439 if z == zonegroup.master_zone:
440 self.master_zone = zone_conn
441
442 def check_all_buckets_exist(zone_conn, buckets):
443 if not zone_conn.zone.has_buckets():
444 return True
445
446 for b in buckets:
447 try:
448 zone_conn.get_bucket(b)
449 except:
450 log.critical('zone %s does not contain bucket %s', zone.name, b)
451 return False
452
453 return True
454
455 def check_all_buckets_dont_exist(zone_conn, buckets):
456 if not zone_conn.zone.has_buckets():
457 return True
458
459 for b in buckets:
460 try:
461 zone_conn.get_bucket(b)
462 except:
463 continue
464
465 log.critical('zone %s contains bucket %s', zone.zone, b)
466 return False
467
468 return True
469
470 def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
471 buckets = []
472 zone_bucket = []
473 for zone in zonegroup_conns.rw_zones:
474 for i in xrange(buckets_per_zone):
475 bucket_name = gen_bucket_name()
476 log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
477 bucket = zone.create_bucket(bucket_name)
478 buckets.append(bucket_name)
479 zone_bucket.append((zone, bucket))
480
481 return buckets, zone_bucket
482
483 def create_bucket_per_zone_in_realm():
484 buckets = []
485 zone_bucket = []
486 for zonegroup in realm.current_period.zonegroups:
487 zg_conn = ZonegroupConns(zonegroup)
488 b, z = create_bucket_per_zone(zg_conn)
489 buckets.extend(b)
490 zone_bucket.extend(z)
491 return buckets, zone_bucket
492
493 def test_bucket_create():
494 zonegroup = realm.master_zonegroup()
495 zonegroup_conns = ZonegroupConns(zonegroup)
496 buckets, _ = create_bucket_per_zone(zonegroup_conns)
497 zonegroup_meta_checkpoint(zonegroup)
498
499 for zone in zonegroup_conns.zones:
500 assert check_all_buckets_exist(zone, buckets)
501
502 def test_bucket_recreate():
503 zonegroup = realm.master_zonegroup()
504 zonegroup_conns = ZonegroupConns(zonegroup)
505 buckets, _ = create_bucket_per_zone(zonegroup_conns)
506 zonegroup_meta_checkpoint(zonegroup)
507
508
509 for zone in zonegroup_conns.zones:
510 assert check_all_buckets_exist(zone, buckets)
511
512 # recreate buckets on all zones, make sure they weren't removed
513 for zone in zonegroup_conns.rw_zones:
514 for bucket_name in buckets:
515 bucket = zone.create_bucket(bucket_name)
516
517 for zone in zonegroup_conns.zones:
518 assert check_all_buckets_exist(zone, buckets)
519
520 zonegroup_meta_checkpoint(zonegroup)
521
522 for zone in zonegroup_conns.zones:
523 assert check_all_buckets_exist(zone, buckets)
524
525 def test_bucket_remove():
526 zonegroup = realm.master_zonegroup()
527 zonegroup_conns = ZonegroupConns(zonegroup)
528 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
529 zonegroup_meta_checkpoint(zonegroup)
530
531 for zone in zonegroup_conns.zones:
532 assert check_all_buckets_exist(zone, buckets)
533
534 for zone, bucket_name in zone_bucket:
535 zone.conn.delete_bucket(bucket_name)
536
537 zonegroup_meta_checkpoint(zonegroup)
538
539 for zone in zonegroup_conns.zones:
540 assert check_all_buckets_dont_exist(zone, buckets)
541
542 def get_bucket(zone, bucket_name):
543 return zone.conn.get_bucket(bucket_name)
544
545 def get_key(zone, bucket_name, obj_name):
546 b = get_bucket(zone, bucket_name)
547 return b.get_key(obj_name)
548
549 def new_key(zone, bucket_name, obj_name):
550 b = get_bucket(zone, bucket_name)
551 return b.new_key(obj_name)
552
553 def check_bucket_eq(zone_conn1, zone_conn2, bucket):
554 return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
555
556 def test_object_sync():
557 zonegroup = realm.master_zonegroup()
558 zonegroup_conns = ZonegroupConns(zonegroup)
559 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
560
561 objnames = [ 'myobj', '_myobj', ':', '&' ]
562 content = 'asdasd'
563
564 # don't wait for meta sync just yet
565 for zone, bucket_name in zone_bucket:
566 for objname in objnames:
567 k = new_key(zone, bucket_name, objname)
568 k.set_contents_from_string(content)
569
570 zonegroup_meta_checkpoint(zonegroup)
571
572 for source_conn, bucket in zone_bucket:
573 for target_conn in zonegroup_conns.zones:
574 if source_conn.zone == target_conn.zone:
575 continue
576
577 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
578 check_bucket_eq(source_conn, target_conn, bucket)
579
580 def test_object_delete():
581 zonegroup = realm.master_zonegroup()
582 zonegroup_conns = ZonegroupConns(zonegroup)
583 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
584
585 objname = 'myobj'
586 content = 'asdasd'
587
588 # don't wait for meta sync just yet
589 for zone, bucket in zone_bucket:
590 k = new_key(zone, bucket, objname)
591 k.set_contents_from_string(content)
592
593 zonegroup_meta_checkpoint(zonegroup)
594
595 # check object exists
596 for source_conn, bucket in zone_bucket:
597 for target_conn in zonegroup_conns.zones:
598 if source_conn.zone == target_conn.zone:
599 continue
600
601 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
602 check_bucket_eq(source_conn, target_conn, bucket)
603
604 # check object removal
605 for source_conn, bucket in zone_bucket:
606 k = get_key(source_conn, bucket, objname)
607 k.delete()
608 for target_conn in zonegroup_conns.zones:
609 if source_conn.zone == target_conn.zone:
610 continue
611
612 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
613 check_bucket_eq(source_conn, target_conn, bucket)
614
615 def get_latest_object_version(key):
616 for k in key.bucket.list_versions(key.name):
617 if k.is_latest:
618 return k
619 return None
620
621 def test_versioned_object_incremental_sync():
622 zonegroup = realm.master_zonegroup()
623 zonegroup_conns = ZonegroupConns(zonegroup)
624 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
625
626 # enable versioning
627 for _, bucket in zone_bucket:
628 bucket.configure_versioning(True)
629
630 zonegroup_meta_checkpoint(zonegroup)
631
632 # upload a dummy object to each bucket and wait for sync. this forces each
633 # bucket to finish a full sync and switch to incremental
634 for source_conn, bucket in zone_bucket:
635 new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
636 for target_conn in zonegroup_conns.zones:
637 if source_conn.zone == target_conn.zone:
638 continue
639 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
640
641 for _, bucket in zone_bucket:
642 # create and delete multiple versions of an object from each zone
643 for zone_conn in zonegroup_conns.rw_zones:
644 obj = 'obj-' + zone_conn.name
645 k = new_key(zone_conn, bucket, obj)
646
647 k.set_contents_from_string('version1')
648 v = get_latest_object_version(k)
649 log.debug('version1 id=%s', v.version_id)
650 # don't delete version1 - this tests that the initial version
651 # doesn't get squashed into later versions
652
653 # create and delete the following object versions to test that
654 # the operations don't race with each other during sync
655 k.set_contents_from_string('version2')
656 v = get_latest_object_version(k)
657 log.debug('version2 id=%s', v.version_id)
658 k.bucket.delete_key(obj, version_id=v.version_id)
659
660 k.set_contents_from_string('version3')
661 v = get_latest_object_version(k)
662 log.debug('version3 id=%s', v.version_id)
663 k.bucket.delete_key(obj, version_id=v.version_id)
664
665 for source_conn, bucket in zone_bucket:
666 for target_conn in zonegroup_conns.zones:
667 if source_conn.zone == target_conn.zone:
668 continue
669 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
670 check_bucket_eq(source_conn, target_conn, bucket)
671
672 def test_bucket_versioning():
673 buckets, zone_bucket = create_bucket_per_zone_in_realm()
674 for _, bucket in zone_bucket:
675 bucket.configure_versioning(True)
676 res = bucket.get_versioning_status()
677 key = 'Versioning'
678 assert(key in res and res[key] == 'Enabled')
679
680 def test_bucket_acl():
681 buckets, zone_bucket = create_bucket_per_zone_in_realm()
682 for _, bucket in zone_bucket:
683 assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
684 bucket.set_acl('public-read')
685 assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
686
687 def test_bucket_cors():
688 buckets, zone_bucket = create_bucket_per_zone_in_realm()
689 for _, bucket in zone_bucket:
690 cors_cfg = CORSConfiguration()
691 cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
692 bucket.set_cors(cors_cfg)
693 assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
694
695 def test_bucket_delete_notempty():
696 zonegroup = realm.master_zonegroup()
697 zonegroup_conns = ZonegroupConns(zonegroup)
698 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
699 zonegroup_meta_checkpoint(zonegroup)
700
701 for zone_conn, bucket_name in zone_bucket:
702 # upload an object to each bucket on its own zone
703 conn = zone_conn.get_connection()
704 bucket = conn.get_bucket(bucket_name)
705 k = bucket.new_key('foo')
706 k.set_contents_from_string('bar')
707 # attempt to delete the bucket before this object can sync
708 try:
709 conn.delete_bucket(bucket_name)
710 except boto.exception.S3ResponseError as e:
711 assert(e.error_code == 'BucketNotEmpty')
712 continue
713 assert False # expected 409 BucketNotEmpty
714
715 # assert that each bucket still exists on the master
716 c1 = zonegroup_conns.master_zone.conn
717 for _, bucket_name in zone_bucket:
718 assert c1.get_bucket(bucket_name)
719
720 def test_multi_period_incremental_sync():
721 zonegroup = realm.master_zonegroup()
722 if len(zonegroup.zones) < 3:
723 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
724
725 # periods to include in mdlog comparison
726 mdlog_periods = [realm.current_period.id]
727
728 # create a bucket in each zone
729 zonegroup_conns = ZonegroupConns(zonegroup)
730 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
731
732 zonegroup_meta_checkpoint(zonegroup)
733
734 z1, z2, z3 = zonegroup.zones[0:3]
735 assert(z1 == zonegroup.master_zone)
736
737 # kill zone 3 gateways to freeze sync status to incremental in first period
738 z3.stop()
739
740 # change master to zone 2 -> period 2
741 set_master_zone(z2)
742 mdlog_periods += [realm.current_period.id]
743
744 for zone_conn, _ in zone_bucket:
745 if zone_conn.zone == z3:
746 continue
747 bucket_name = gen_bucket_name()
748 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
749 bucket = zone_conn.conn.create_bucket(bucket_name)
750 buckets.append(bucket_name)
751
752 # wait for zone 1 to sync
753 zone_meta_checkpoint(z1)
754
755 # change master back to zone 1 -> period 3
756 set_master_zone(z1)
757 mdlog_periods += [realm.current_period.id]
758
759 for zone_conn, bucket_name in zone_bucket:
760 if zone_conn.zone == z3:
761 continue
762 bucket_name = gen_bucket_name()
763 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
764 bucket = zone_conn.conn.create_bucket(bucket_name)
765 buckets.append(bucket_name)
766
767 # restart zone 3 gateway and wait for sync
768 z3.start()
769 zonegroup_meta_checkpoint(zonegroup)
770
771 # verify that we end up with the same objects
772 for bucket_name in buckets:
773 for source_conn, _ in zone_bucket:
774 for target_conn in zonegroup_conns.zones:
775 if source_conn.zone == target_conn.zone:
776 continue
777
778 target_conn.check_bucket_eq(source_conn, bucket_name)
779
780 # verify that mdlogs are not empty and match for each period
781 for period in mdlog_periods:
782 master_mdlog = mdlog_list(z1, period)
783 assert len(master_mdlog) > 0
784 for zone in zonegroup.zones:
785 if zone == z1:
786 continue
787 mdlog = mdlog_list(zone, period)
788 assert len(mdlog) == len(master_mdlog)
789
790 # autotrim mdlogs for master zone
791 mdlog_autotrim(z1)
792
793 # autotrim mdlogs for peers
794 for zone in zonegroup.zones:
795 if zone == z1:
796 continue
797 mdlog_autotrim(zone)
798
799 # verify that mdlogs are empty for each period
800 for period in mdlog_periods:
801 for zone in zonegroup.zones:
802 mdlog = mdlog_list(zone, period)
803 assert len(mdlog) == 0
804
805 def test_zonegroup_remove():
806 zonegroup = realm.master_zonegroup()
807 zonegroup_conns = ZonegroupConns(zonegroup)
808 if len(zonegroup.zones) < 2:
809 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
810
811 zonegroup_meta_checkpoint(zonegroup)
812 z1, z2 = zonegroup.zones[0:2]
813 c1, c2 = (z1.cluster, z2.cluster)
814
815 # create a new zone in zonegroup on c2 and commit
816 zone = Zone('remove', zonegroup, c2)
817 zone.create(c2)
818 zonegroup.zones.append(zone)
819 zonegroup.period.update(zone, commit=True)
820
821 zonegroup.remove(c1, zone)
822
823 # another 'zonegroup remove' should fail with ENOENT
824 _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
825 assert(retcode == 2) # ENOENT
826
827 # delete the new zone
828 zone.delete(c2)
829
830 # validate the resulting period
831 zonegroup.period.update(z1, commit=True)
832
833 def test_set_bucket_website():
834 buckets, zone_bucket = create_bucket_per_zone_in_realm()
835 for _, bucket in zone_bucket:
836 website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
837 try:
838 bucket.set_website_configuration(website_cfg)
839 except boto.exception.S3ResponseError as e:
840 if e.error_code == 'MethodNotAllowed':
841 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
842 assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
843
844 def test_set_bucket_policy():
845 policy = '''{
846 "Version": "2012-10-17",
847 "Statement": [{
848 "Effect": "Allow",
849 "Principal": "*"
850 }]
851 }'''
852 buckets, zone_bucket = create_bucket_per_zone_in_realm()
853 for _, bucket in zone_bucket:
854 bucket.set_policy(policy)
855 assert(bucket.get_policy() == policy)
856
857 def test_bucket_sync_disable():
858 zonegroup = realm.master_zonegroup()
859 zonegroup_conns = ZonegroupConns(zonegroup)
860 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
861
862 for bucket_name in buckets:
863 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
864
865 for zone in zonegroup.zones:
866 check_buckets_sync_status_obj_not_exist(zone, buckets)
867
868 def test_bucket_sync_enable_right_after_disable():
869 zonegroup = realm.master_zonegroup()
870 zonegroup_conns = ZonegroupConns(zonegroup)
871 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
872
873 objnames = ['obj1', 'obj2', 'obj3', 'obj4']
874 content = 'asdasd'
875
876 for zone, bucket in zone_bucket:
877 for objname in objnames:
878 k = new_key(zone, bucket.name, objname)
879 k.set_contents_from_string(content)
880
881 for bucket_name in buckets:
882 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
883
884 for bucket_name in buckets:
885 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
886 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
887
888 objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
889
890 for zone, bucket in zone_bucket:
891 for objname in objnames_2:
892 k = new_key(zone, bucket.name, objname)
893 k.set_contents_from_string(content)
894
895 for bucket_name in buckets:
896 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
897
898 def test_bucket_sync_disable_enable():
899 zonegroup = realm.master_zonegroup()
900 zonegroup_conns = ZonegroupConns(zonegroup)
901 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
902
903 objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
904 content = 'asdasd'
905
906 for zone, bucket in zone_bucket:
907 for objname in objnames:
908 k = new_key(zone, bucket.name, objname)
909 k.set_contents_from_string(content)
910
911 zonegroup_meta_checkpoint(zonegroup)
912
913 for bucket_name in buckets:
914 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
915
916 for bucket_name in buckets:
917 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
918
919 zonegroup_meta_checkpoint(zonegroup)
920
921 objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
922
923 for zone, bucket in zone_bucket:
924 for objname in objnames_2:
925 k = new_key(zone, bucket.name, objname)
926 k.set_contents_from_string(content)
927
928 for bucket_name in buckets:
929 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
930
931 for bucket_name in buckets:
932 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
933
934 def test_encrypted_object_sync():
935 zonegroup = realm.master_zonegroup()
936 zonegroup_conns = ZonegroupConns(zonegroup)
937
938 (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
939
940 # create a bucket on the first zone
941 bucket_name = gen_bucket_name()
942 log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
943 bucket = zone1.conn.create_bucket(bucket_name)
944
945 # upload an object with sse-c encryption
946 sse_c_headers = {
947 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
948 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
949 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
950 }
951 key = bucket.new_key('testobj-sse-c')
952 data = 'A'*512
953 key.set_contents_from_string(data, headers=sse_c_headers)
954
955 # upload an object with sse-kms encryption
956 sse_kms_headers = {
957 'x-amz-server-side-encryption': 'aws:kms',
958 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
959 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
960 }
961 key = bucket.new_key('testobj-sse-kms')
962 key.set_contents_from_string(data, headers=sse_kms_headers)
963
964 # wait for the bucket metadata and data to sync
965 zonegroup_meta_checkpoint(zonegroup)
966 zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
967
968 # read the encrypted objects from the second zone
969 bucket2 = get_bucket(zone2, bucket_name)
970 key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
971 eq(data, key.get_contents_as_string(headers=sse_c_headers))
972
973 key = bucket2.get_key('testobj-sse-kms')
974 eq(data, key.get_contents_as_string())