]>
git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests.py
9 from itertools
import izip_longest
as zip_longest
11 from itertools
import zip_longest
12 from itertools
import combinations
13 from cStringIO
import StringIO
16 import boto
.s3
.connection
17 from boto
.s3
.website
import WebsiteConfiguration
18 from boto
.s3
.cors
import CORSConfiguration
20 from nose
.tools
import eq_
as eq
21 from nose
.plugins
.attrib
import attr
22 from nose
.plugins
.skip
import SkipTest
24 from .multisite
import Zone
26 from .conn
import get_gateway_connection
29 """ test configuration """
30 def __init__(self
, **kwargs
):
31 # by default, wait up to 5 minutes before giving up on a sync checkpoint
32 self
.checkpoint_retries
= kwargs
.get('checkpoint_retries', 60)
33 self
.checkpoint_delay
= kwargs
.get('checkpoint_delay', 5)
34 # allow some time for realm reconfiguration after changing master zone
35 self
.reconfigure_delay
= kwargs
.get('reconfigure_delay', 5)
37 # rgw multisite tests, written against the interfaces provided in rgw_multi.
38 # these tests must be initialized and run by another module that provides
39 # implementations of these interfaces by calling init_multi()
43 def init_multi(_realm
, _user
, _config
=None):
49 config
= _config
or Config()
50 realm_meta_checkpoint(realm
)
55 log
= logging
.getLogger(__name__
)
58 run_prefix
=''.join(random
.choice(string
.ascii_lowercase
) for _
in range(6))
60 def get_gateway_connection(gateway
, credentials
):
61 """ connect to the given gateway """
62 if gateway
.connection
is None:
63 gateway
.connection
= boto
.connect_s3(
64 aws_access_key_id
= credentials
.access_key
,
65 aws_secret_access_key
= credentials
.secret
,
69 calling_format
= boto
.s3
.connection
.OrdinaryCallingFormat())
70 return gateway
.connection
72 def get_zone_connection(zone
, credentials
):
73 """ connect to the zone's first gateway """
74 if isinstance(credentials
, list):
75 credentials
= credentials
[0]
76 return get_gateway_connection(zone
.gateways
[0], credentials
)
78 def mdlog_list(zone
, period
= None):
79 cmd
= ['mdlog', 'list']
81 cmd
+= ['--period', period
]
82 (mdlog_json
, _
) = zone
.cluster
.admin(cmd
, read_only
=True)
83 mdlog_json
= mdlog_json
.decode('utf-8')
84 return json
.loads(mdlog_json
)
86 def meta_sync_status(zone
):
88 cmd
= ['metadata', 'sync', 'status'] + zone
.zone_args()
89 meta_sync_status_json
, retcode
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
92 assert(retcode
== 2) # ENOENT
95 def mdlog_autotrim(zone
):
96 zone
.cluster
.admin(['mdlog', 'autotrim'])
98 def bilog_list(zone
, bucket
, args
= None):
99 cmd
= ['bilog', 'list', '--bucket', bucket
] + (args
or [])
100 bilog
, _
= zone
.cluster
.admin(cmd
, read_only
=True)
101 bilog
= bilog
.decode('utf-8')
102 return json
.loads(bilog
)
104 def bilog_autotrim(zone
, args
= None):
105 zone
.cluster
.admin(['bilog', 'autotrim'] + (args
or []))
107 def parse_meta_sync_status(meta_sync_status_json
):
108 meta_sync_status_json
= meta_sync_status_json
.decode('utf-8')
109 log
.debug('current meta sync status=%s', meta_sync_status_json
)
110 sync_status
= json
.loads(meta_sync_status_json
)
112 sync_info
= sync_status
['sync_status']['info']
113 global_sync_status
= sync_info
['status']
114 num_shards
= sync_info
['num_shards']
115 period
= sync_info
['period']
116 realm_epoch
= sync_info
['realm_epoch']
118 sync_markers
=sync_status
['sync_status']['markers']
119 log
.debug('sync_markers=%s', sync_markers
)
120 assert(num_shards
== len(sync_markers
))
123 for i
in range(num_shards
):
124 # get marker, only if it's an incremental marker for the same realm epoch
125 if realm_epoch
> sync_markers
[i
]['val']['realm_epoch'] or sync_markers
[i
]['val']['state'] == 0:
128 markers
[i
] = sync_markers
[i
]['val']['marker']
130 return period
, realm_epoch
, num_shards
, markers
132 def meta_sync_status(zone
):
133 for _
in range(config
.checkpoint_retries
):
134 cmd
= ['metadata', 'sync', 'status'] + zone
.zone_args()
135 meta_sync_status_json
, retcode
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
137 return parse_meta_sync_status(meta_sync_status_json
)
138 assert(retcode
== 2) # ENOENT
139 time
.sleep(config
.checkpoint_delay
)
141 assert False, 'failed to read metadata sync status for zone=%s' % zone
.name
143 def meta_master_log_status(master_zone
):
144 cmd
= ['mdlog', 'status'] + master_zone
.zone_args()
145 mdlog_status_json
, retcode
= master_zone
.cluster
.admin(cmd
, read_only
=True)
146 mdlog_status
= json
.loads(mdlog_status_json
.decode('utf-8'))
148 markers
= {i
: s
['marker'] for i
, s
in enumerate(mdlog_status
)}
149 log
.debug('master meta markers=%s', markers
)
152 def compare_meta_status(zone
, log_status
, sync_status
):
153 if len(log_status
) != len(sync_status
):
154 log
.error('len(log_status)=%d, len(sync_status)=%d', len(log_status
), len(sync_status
))
158 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
162 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
165 log
.warning('zone %s behind master: %s', zone
.name
, msg
)
170 def zone_meta_checkpoint(zone
, meta_master_zone
= None, master_status
= None):
171 if not meta_master_zone
:
172 meta_master_zone
= zone
.realm().meta_master_zone()
173 if not master_status
:
174 master_status
= meta_master_log_status(meta_master_zone
)
176 current_realm_epoch
= realm
.current_period
.data
['realm_epoch']
178 log
.info('starting meta checkpoint for zone=%s', zone
.name
)
180 for _
in range(config
.checkpoint_retries
):
181 period
, realm_epoch
, num_shards
, sync_status
= meta_sync_status(zone
)
182 if realm_epoch
< current_realm_epoch
:
183 log
.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
184 zone
.name
, realm_epoch
, current_realm_epoch
)
186 log
.debug('log_status=%s', master_status
)
187 log
.debug('sync_status=%s', sync_status
)
188 if compare_meta_status(zone
, master_status
, sync_status
):
189 log
.info('finish meta checkpoint for zone=%s', zone
.name
)
192 time
.sleep(config
.checkpoint_delay
)
193 assert False, 'failed meta checkpoint for zone=%s' % zone
.name
195 def zonegroup_meta_checkpoint(zonegroup
, meta_master_zone
= None, master_status
= None):
196 if not meta_master_zone
:
197 meta_master_zone
= zonegroup
.realm().meta_master_zone()
198 if not master_status
:
199 master_status
= meta_master_log_status(meta_master_zone
)
201 for zone
in zonegroup
.zones
:
202 if zone
== meta_master_zone
:
204 zone_meta_checkpoint(zone
, meta_master_zone
, master_status
)
206 def realm_meta_checkpoint(realm
):
207 log
.info('meta checkpoint')
209 meta_master_zone
= realm
.meta_master_zone()
210 master_status
= meta_master_log_status(meta_master_zone
)
212 for zonegroup
in realm
.current_period
.zonegroups
:
213 zonegroup_meta_checkpoint(zonegroup
, meta_master_zone
, master_status
)
215 def parse_data_sync_status(data_sync_status_json
):
216 data_sync_status_json
= data_sync_status_json
.decode('utf-8')
217 log
.debug('current data sync status=%s', data_sync_status_json
)
218 sync_status
= json
.loads(data_sync_status_json
)
220 global_sync_status
=sync_status
['sync_status']['info']['status']
221 num_shards
=sync_status
['sync_status']['info']['num_shards']
223 sync_markers
=sync_status
['sync_status']['markers']
224 log
.debug('sync_markers=%s', sync_markers
)
225 assert(num_shards
== len(sync_markers
))
228 for i
in range(num_shards
):
229 markers
[i
] = sync_markers
[i
]['val']['marker']
231 return (num_shards
, markers
)
233 def data_sync_status(target_zone
, source_zone
):
234 if target_zone
== source_zone
:
237 for _
in range(config
.checkpoint_retries
):
238 cmd
= ['data', 'sync', 'status'] + target_zone
.zone_args()
239 cmd
+= ['--source-zone', source_zone
.name
]
240 data_sync_status_json
, retcode
= target_zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
242 return parse_data_sync_status(data_sync_status_json
)
244 assert(retcode
== 2) # ENOENT
245 time
.sleep(config
.checkpoint_delay
)
247 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
248 (target_zone
.name
, source_zone
.name
)
250 def bucket_sync_status(target_zone
, source_zone
, bucket_name
):
251 if target_zone
== source_zone
:
254 cmd
= ['bucket', 'sync', 'markers'] + target_zone
.zone_args()
255 cmd
+= ['--source-zone', source_zone
.name
]
256 cmd
+= ['--bucket', bucket_name
]
258 bucket_sync_status_json
, retcode
= target_zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
262 assert(retcode
== 2) # ENOENT
264 bucket_sync_status_json
= bucket_sync_status_json
.decode('utf-8')
265 log
.debug('current bucket sync markers=%s', bucket_sync_status_json
)
266 sync_status
= json
.loads(bucket_sync_status_json
)
269 for entry
in sync_status
:
271 if val
['status'] == 'incremental-sync':
272 pos
= val
['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
275 markers
[entry
['key']] = pos
279 def data_source_log_status(source_zone
):
280 source_cluster
= source_zone
.cluster
281 cmd
= ['datalog', 'status'] + source_zone
.zone_args()
282 datalog_status_json
, retcode
= source_cluster
.rgw_admin(cmd
, read_only
=True)
283 datalog_status
= json
.loads(datalog_status_json
.decode('utf-8'))
285 markers
= {i
: s
['marker'] for i
, s
in enumerate(datalog_status
)}
286 log
.debug('data markers for zone=%s markers=%s', source_zone
.name
, markers
)
289 def bucket_source_log_status(source_zone
, bucket_name
):
290 cmd
= ['bilog', 'status'] + source_zone
.zone_args()
291 cmd
+= ['--bucket', bucket_name
]
292 source_cluster
= source_zone
.cluster
293 bilog_status_json
, retcode
= source_cluster
.admin(cmd
, read_only
=True)
294 bilog_status
= json
.loads(bilog_status_json
.decode('utf-8'))
299 m
= bilog_status
['markers']
308 log
.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone
.name
, bucket_name
, markers
)
311 def compare_data_status(target_zone
, source_zone
, log_status
, sync_status
):
312 if len(log_status
) != len(sync_status
):
313 log
.error('len(log_status)=%d len(sync_status)=%d', len(log_status
), len(sync_status
))
317 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
321 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
324 log
.warning('data of zone %s behind zone %s: %s', target_zone
.name
, source_zone
.name
, msg
)
329 def compare_bucket_status(target_zone
, source_zone
, bucket_name
, log_status
, sync_status
):
330 if len(log_status
) != len(sync_status
):
331 log
.error('len(log_status)=%d len(sync_status)=%d', len(log_status
), len(sync_status
))
335 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
339 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
342 log
.warning('bucket %s zone %s behind zone %s: %s', bucket_name
, target_zone
.name
, source_zone
.name
, msg
)
347 def zone_data_checkpoint(target_zone
, source_zone_conn
):
348 if target_zone
== source_zone
:
351 log_status
= data_source_log_status(source_zone
)
352 log
.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone
.name
, source_zone
.name
)
354 for _
in range(config
.checkpoint_retries
):
355 num_shards
, sync_status
= data_sync_status(target_zone
, source_zone
)
357 log
.debug('log_status=%s', log_status
)
358 log
.debug('sync_status=%s', sync_status
)
360 if compare_data_status(target_zone
, source_zone
, log_status
, sync_status
):
361 log
.info('finished data checkpoint for target_zone=%s source_zone=%s',
362 target_zone
.name
, source_zone
.name
)
364 time
.sleep(config
.checkpoint_delay
)
366 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
367 (target_zone
.name
, source_zone
.name
)
370 def zone_bucket_checkpoint(target_zone
, source_zone
, bucket_name
):
371 if target_zone
== source_zone
:
374 log_status
= bucket_source_log_status(source_zone
, bucket_name
)
375 log
.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone
.name
, source_zone
.name
, bucket_name
)
377 for _
in range(config
.checkpoint_retries
):
378 sync_status
= bucket_sync_status(target_zone
, source_zone
, bucket_name
)
380 log
.debug('log_status=%s', log_status
)
381 log
.debug('sync_status=%s', sync_status
)
383 if compare_bucket_status(target_zone
, source_zone
, bucket_name
, log_status
, sync_status
):
384 log
.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone
.name
, source_zone
.name
, bucket_name
)
387 time
.sleep(config
.checkpoint_delay
)
389 assert False, 'failed bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
390 (target_zone
.name
, source_zone
.name
, bucket_name
)
392 def zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
):
393 for source_conn
in zonegroup_conns
.zones
:
394 for target_conn
in zonegroup_conns
.zones
:
395 if source_conn
.zone
== target_conn
.zone
:
397 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket_name
)
398 for source_conn
, target_conn
in combinations(zonegroup_conns
.zones
, 2):
399 target_conn
.check_bucket_eq(source_conn
, bucket_name
)
401 def set_master_zone(zone
):
402 zone
.modify(zone
.cluster
, ['--master'])
403 zonegroup
= zone
.zonegroup
404 zonegroup
.period
.update(zone
, commit
=True)
405 zonegroup
.master_zone
= zone
406 log
.info('Set master zone=%s, waiting %ds for reconfiguration..', zone
.name
, config
.reconfigure_delay
)
407 time
.sleep(config
.reconfigure_delay
)
409 def enable_bucket_sync(zone
, bucket_name
):
410 cmd
= ['bucket', 'sync', 'enable', '--bucket', bucket_name
] + zone
.zone_args()
411 zone
.cluster
.admin(cmd
)
413 def disable_bucket_sync(zone
, bucket_name
):
414 cmd
= ['bucket', 'sync', 'disable', '--bucket', bucket_name
] + zone
.zone_args()
415 zone
.cluster
.admin(cmd
)
417 def check_buckets_sync_status_obj_not_exist(zone
, buckets
):
418 for _
in range(config
.checkpoint_retries
):
419 cmd
= ['log', 'list'] + zone
.zone_arg()
420 log_list
, ret
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
421 for bucket
in buckets
:
422 if log_list
.find(':'+bucket
+":") >= 0:
426 time
.sleep(config
.checkpoint_delay
)
429 def gen_bucket_name():
433 return run_prefix
+ '-' + str(num_buckets
)
435 class ZonegroupConns
:
436 def __init__(self
, zonegroup
):
437 self
.zonegroup
= zonegroup
441 self
.master_zone
= None
442 for z
in zonegroup
.zones
:
443 zone_conn
= z
.get_conn(user
.credentials
)
444 self
.zones
.append(zone_conn
)
446 self
.ro_zones
.append(zone_conn
)
448 self
.rw_zones
.append(zone_conn
)
450 if z
== zonegroup
.master_zone
:
451 self
.master_zone
= zone_conn
453 def check_all_buckets_exist(zone_conn
, buckets
):
454 if not zone_conn
.zone
.has_buckets():
459 zone_conn
.get_bucket(b
)
461 log
.critical('zone %s does not contain bucket %s', zone
.name
, b
)
466 def check_all_buckets_dont_exist(zone_conn
, buckets
):
467 if not zone_conn
.zone
.has_buckets():
472 zone_conn
.get_bucket(b
)
476 log
.critical('zone %s contains bucket %s', zone
.zone
, b
)
481 def create_bucket_per_zone(zonegroup_conns
, buckets_per_zone
= 1):
484 for zone
in zonegroup_conns
.rw_zones
:
485 for i
in xrange(buckets_per_zone
):
486 bucket_name
= gen_bucket_name()
487 log
.info('create bucket zone=%s name=%s', zone
.name
, bucket_name
)
488 bucket
= zone
.create_bucket(bucket_name
)
489 buckets
.append(bucket_name
)
490 zone_bucket
.append((zone
, bucket
))
492 return buckets
, zone_bucket
494 def create_bucket_per_zone_in_realm():
497 for zonegroup
in realm
.current_period
.zonegroups
:
498 zg_conn
= ZonegroupConns(zonegroup
)
499 b
, z
= create_bucket_per_zone(zg_conn
)
501 zone_bucket
.extend(z
)
502 return buckets
, zone_bucket
504 def test_bucket_create():
505 zonegroup
= realm
.master_zonegroup()
506 zonegroup_conns
= ZonegroupConns(zonegroup
)
507 buckets
, _
= create_bucket_per_zone(zonegroup_conns
)
508 zonegroup_meta_checkpoint(zonegroup
)
510 for zone
in zonegroup_conns
.zones
:
511 assert check_all_buckets_exist(zone
, buckets
)
513 def test_bucket_recreate():
514 zonegroup
= realm
.master_zonegroup()
515 zonegroup_conns
= ZonegroupConns(zonegroup
)
516 buckets
, _
= create_bucket_per_zone(zonegroup_conns
)
517 zonegroup_meta_checkpoint(zonegroup
)
520 for zone
in zonegroup_conns
.zones
:
521 assert check_all_buckets_exist(zone
, buckets
)
523 # recreate buckets on all zones, make sure they weren't removed
524 for zone
in zonegroup_conns
.rw_zones
:
525 for bucket_name
in buckets
:
526 bucket
= zone
.create_bucket(bucket_name
)
528 for zone
in zonegroup_conns
.zones
:
529 assert check_all_buckets_exist(zone
, buckets
)
531 zonegroup_meta_checkpoint(zonegroup
)
533 for zone
in zonegroup_conns
.zones
:
534 assert check_all_buckets_exist(zone
, buckets
)
536 def test_bucket_remove():
537 zonegroup
= realm
.master_zonegroup()
538 zonegroup_conns
= ZonegroupConns(zonegroup
)
539 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
540 zonegroup_meta_checkpoint(zonegroup
)
542 for zone
in zonegroup_conns
.zones
:
543 assert check_all_buckets_exist(zone
, buckets
)
545 for zone
, bucket_name
in zone_bucket
:
546 zone
.conn
.delete_bucket(bucket_name
)
548 zonegroup_meta_checkpoint(zonegroup
)
550 for zone
in zonegroup_conns
.zones
:
551 assert check_all_buckets_dont_exist(zone
, buckets
)
553 def get_bucket(zone
, bucket_name
):
554 return zone
.conn
.get_bucket(bucket_name
)
556 def get_key(zone
, bucket_name
, obj_name
):
557 b
= get_bucket(zone
, bucket_name
)
558 return b
.get_key(obj_name
)
560 def new_key(zone
, bucket_name
, obj_name
):
561 b
= get_bucket(zone
, bucket_name
)
562 return b
.new_key(obj_name
)
564 def check_bucket_eq(zone_conn1
, zone_conn2
, bucket
):
565 return zone_conn2
.check_bucket_eq(zone_conn1
, bucket
.name
)
567 def test_object_sync():
568 zonegroup
= realm
.master_zonegroup()
569 zonegroup_conns
= ZonegroupConns(zonegroup
)
570 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
572 objnames
= [ 'myobj', '_myobj', ':', '&' ]
575 # don't wait for meta sync just yet
576 for zone
, bucket_name
in zone_bucket
:
577 for objname
in objnames
:
578 k
= new_key(zone
, bucket_name
, objname
)
579 k
.set_contents_from_string(content
)
581 zonegroup_meta_checkpoint(zonegroup
)
583 for source_conn
, bucket
in zone_bucket
:
584 for target_conn
in zonegroup_conns
.zones
:
585 if source_conn
.zone
== target_conn
.zone
:
588 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
589 check_bucket_eq(source_conn
, target_conn
, bucket
)
591 def test_object_delete():
592 zonegroup
= realm
.master_zonegroup()
593 zonegroup_conns
= ZonegroupConns(zonegroup
)
594 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
599 # don't wait for meta sync just yet
600 for zone
, bucket
in zone_bucket
:
601 k
= new_key(zone
, bucket
, objname
)
602 k
.set_contents_from_string(content
)
604 zonegroup_meta_checkpoint(zonegroup
)
606 # check object exists
607 for source_conn
, bucket
in zone_bucket
:
608 for target_conn
in zonegroup_conns
.zones
:
609 if source_conn
.zone
== target_conn
.zone
:
612 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
613 check_bucket_eq(source_conn
, target_conn
, bucket
)
615 # check object removal
616 for source_conn
, bucket
in zone_bucket
:
617 k
= get_key(source_conn
, bucket
, objname
)
619 for target_conn
in zonegroup_conns
.zones
:
620 if source_conn
.zone
== target_conn
.zone
:
623 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
624 check_bucket_eq(source_conn
, target_conn
, bucket
)
626 def get_latest_object_version(key
):
627 for k
in key
.bucket
.list_versions(key
.name
):
632 def test_versioned_object_incremental_sync():
633 zonegroup
= realm
.master_zonegroup()
634 zonegroup_conns
= ZonegroupConns(zonegroup
)
635 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
638 for _
, bucket
in zone_bucket
:
639 bucket
.configure_versioning(True)
641 zonegroup_meta_checkpoint(zonegroup
)
643 # upload a dummy object to each bucket and wait for sync. this forces each
644 # bucket to finish a full sync and switch to incremental
645 for source_conn
, bucket
in zone_bucket
:
646 new_key(source_conn
, bucket
, 'dummy').set_contents_from_string('')
647 for target_conn
in zonegroup_conns
.zones
:
648 if source_conn
.zone
== target_conn
.zone
:
650 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
652 for _
, bucket
in zone_bucket
:
653 # create and delete multiple versions of an object from each zone
654 for zone_conn
in zonegroup_conns
.rw_zones
:
655 obj
= 'obj-' + zone_conn
.name
656 k
= new_key(zone_conn
, bucket
, obj
)
658 k
.set_contents_from_string('version1')
659 v
= get_latest_object_version(k
)
660 log
.debug('version1 id=%s', v
.version_id
)
661 # don't delete version1 - this tests that the initial version
662 # doesn't get squashed into later versions
664 # create and delete the following object versions to test that
665 # the operations don't race with each other during sync
666 k
.set_contents_from_string('version2')
667 v
= get_latest_object_version(k
)
668 log
.debug('version2 id=%s', v
.version_id
)
669 k
.bucket
.delete_key(obj
, version_id
=v
.version_id
)
671 k
.set_contents_from_string('version3')
672 v
= get_latest_object_version(k
)
673 log
.debug('version3 id=%s', v
.version_id
)
674 k
.bucket
.delete_key(obj
, version_id
=v
.version_id
)
676 for _
, bucket
in zone_bucket
:
677 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
679 def test_bucket_versioning():
680 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
681 for _
, bucket
in zone_bucket
:
682 bucket
.configure_versioning(True)
683 res
= bucket
.get_versioning_status()
685 assert(key
in res
and res
[key
] == 'Enabled')
687 def test_bucket_acl():
688 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
689 for _
, bucket
in zone_bucket
:
690 assert(len(bucket
.get_acl().acl
.grants
) == 1) # single grant on owner
691 bucket
.set_acl('public-read')
692 assert(len(bucket
.get_acl().acl
.grants
) == 2) # new grant on AllUsers
694 def test_bucket_cors():
695 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
696 for _
, bucket
in zone_bucket
:
697 cors_cfg
= CORSConfiguration()
698 cors_cfg
.add_rule(['DELETE'], 'https://www.example.com', allowed_header
='*', max_age_seconds
=3000)
699 bucket
.set_cors(cors_cfg
)
700 assert(bucket
.get_cors().to_xml() == cors_cfg
.to_xml())
702 def test_bucket_delete_notempty():
703 zonegroup
= realm
.master_zonegroup()
704 zonegroup_conns
= ZonegroupConns(zonegroup
)
705 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
706 zonegroup_meta_checkpoint(zonegroup
)
708 for zone_conn
, bucket_name
in zone_bucket
:
709 # upload an object to each bucket on its own zone
710 conn
= zone_conn
.get_connection()
711 bucket
= conn
.get_bucket(bucket_name
)
712 k
= bucket
.new_key('foo')
713 k
.set_contents_from_string('bar')
714 # attempt to delete the bucket before this object can sync
716 conn
.delete_bucket(bucket_name
)
717 except boto
.exception
.S3ResponseError
as e
:
718 assert(e
.error_code
== 'BucketNotEmpty')
720 assert False # expected 409 BucketNotEmpty
722 # assert that each bucket still exists on the master
723 c1
= zonegroup_conns
.master_zone
.conn
724 for _
, bucket_name
in zone_bucket
:
725 assert c1
.get_bucket(bucket_name
)
727 def test_multi_period_incremental_sync():
728 zonegroup
= realm
.master_zonegroup()
729 if len(zonegroup
.zones
) < 3:
730 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
732 # periods to include in mdlog comparison
733 mdlog_periods
= [realm
.current_period
.id]
735 # create a bucket in each zone
736 zonegroup_conns
= ZonegroupConns(zonegroup
)
737 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
739 zonegroup_meta_checkpoint(zonegroup
)
741 z1
, z2
, z3
= zonegroup
.zones
[0:3]
742 assert(z1
== zonegroup
.master_zone
)
744 # kill zone 3 gateways to freeze sync status to incremental in first period
747 # change master to zone 2 -> period 2
749 mdlog_periods
+= [realm
.current_period
.id]
751 for zone_conn
, _
in zone_bucket
:
752 if zone_conn
.zone
== z3
:
754 bucket_name
= gen_bucket_name()
755 log
.info('create bucket zone=%s name=%s', zone_conn
.name
, bucket_name
)
756 bucket
= zone_conn
.conn
.create_bucket(bucket_name
)
757 buckets
.append(bucket_name
)
759 # wait for zone 1 to sync
760 zone_meta_checkpoint(z1
)
762 # change master back to zone 1 -> period 3
764 mdlog_periods
+= [realm
.current_period
.id]
766 for zone_conn
, bucket_name
in zone_bucket
:
767 if zone_conn
.zone
== z3
:
769 bucket_name
= gen_bucket_name()
770 log
.info('create bucket zone=%s name=%s', zone_conn
.name
, bucket_name
)
771 bucket
= zone_conn
.conn
.create_bucket(bucket_name
)
772 buckets
.append(bucket_name
)
774 # restart zone 3 gateway and wait for sync
776 zonegroup_meta_checkpoint(zonegroup
)
778 # verify that we end up with the same objects
779 for bucket_name
in buckets
:
780 for source_conn
, _
in zone_bucket
:
781 for target_conn
in zonegroup_conns
.zones
:
782 if source_conn
.zone
== target_conn
.zone
:
785 target_conn
.check_bucket_eq(source_conn
, bucket_name
)
787 # verify that mdlogs are not empty and match for each period
788 for period
in mdlog_periods
:
789 master_mdlog
= mdlog_list(z1
, period
)
790 assert len(master_mdlog
) > 0
791 for zone
in zonegroup
.zones
:
794 mdlog
= mdlog_list(zone
, period
)
795 assert len(mdlog
) == len(master_mdlog
)
797 # autotrim mdlogs for master zone
800 # autotrim mdlogs for peers
801 for zone
in zonegroup
.zones
:
806 # verify that mdlogs are empty for each period
807 for period
in mdlog_periods
:
808 for zone
in zonegroup
.zones
:
809 mdlog
= mdlog_list(zone
, period
)
810 assert len(mdlog
) == 0
812 def test_zonegroup_remove():
813 zonegroup
= realm
.master_zonegroup()
814 zonegroup_conns
= ZonegroupConns(zonegroup
)
815 if len(zonegroup
.zones
) < 2:
816 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
818 zonegroup_meta_checkpoint(zonegroup
)
819 z1
, z2
= zonegroup
.zones
[0:2]
820 c1
, c2
= (z1
.cluster
, z2
.cluster
)
822 # create a new zone in zonegroup on c2 and commit
823 zone
= Zone('remove', zonegroup
, c2
)
825 zonegroup
.zones
.append(zone
)
826 zonegroup
.period
.update(zone
, commit
=True)
828 zonegroup
.remove(c1
, zone
)
830 # another 'zonegroup remove' should fail with ENOENT
831 _
, retcode
= zonegroup
.remove(c1
, zone
, check_retcode
=False)
832 assert(retcode
== 2) # ENOENT
834 # delete the new zone
837 # validate the resulting period
838 zonegroup
.period
.update(z1
, commit
=True)
840 def test_set_bucket_website():
841 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
842 for _
, bucket
in zone_bucket
:
843 website_cfg
= WebsiteConfiguration(suffix
='index.html',error_key
='error.html')
845 bucket
.set_website_configuration(website_cfg
)
846 except boto
.exception
.S3ResponseError
as e
:
847 if e
.error_code
== 'MethodNotAllowed':
848 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
849 assert(bucket
.get_website_configuration_with_xml()[1] == website_cfg
.to_xml())
851 def test_set_bucket_policy():
853 "Version": "2012-10-17",
859 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
860 for _
, bucket
in zone_bucket
:
861 bucket
.set_policy(policy
)
862 assert(bucket
.get_policy() == policy
)
864 def test_bucket_sync_disable():
865 zonegroup
= realm
.master_zonegroup()
866 zonegroup_conns
= ZonegroupConns(zonegroup
)
867 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
869 for bucket_name
in buckets
:
870 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
872 for zone
in zonegroup
.zones
:
873 check_buckets_sync_status_obj_not_exist(zone
, buckets
)
875 def test_bucket_sync_enable_right_after_disable():
876 zonegroup
= realm
.master_zonegroup()
877 zonegroup_conns
= ZonegroupConns(zonegroup
)
878 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
880 objnames
= ['obj1', 'obj2', 'obj3', 'obj4']
883 for zone
, bucket
in zone_bucket
:
884 for objname
in objnames
:
885 k
= new_key(zone
, bucket
.name
, objname
)
886 k
.set_contents_from_string(content
)
888 for bucket_name
in buckets
:
889 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
891 for bucket_name
in buckets
:
892 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
893 enable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
895 objnames_2
= ['obj5', 'obj6', 'obj7', 'obj8']
897 for zone
, bucket
in zone_bucket
:
898 for objname
in objnames_2
:
899 k
= new_key(zone
, bucket
.name
, objname
)
900 k
.set_contents_from_string(content
)
902 for bucket_name
in buckets
:
903 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
905 def test_bucket_sync_disable_enable():
906 zonegroup
= realm
.master_zonegroup()
907 zonegroup_conns
= ZonegroupConns(zonegroup
)
908 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
910 objnames
= [ 'obj1', 'obj2', 'obj3', 'obj4' ]
913 for zone
, bucket
in zone_bucket
:
914 for objname
in objnames
:
915 k
= new_key(zone
, bucket
.name
, objname
)
916 k
.set_contents_from_string(content
)
918 zonegroup_meta_checkpoint(zonegroup
)
920 for bucket_name
in buckets
:
921 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
923 for bucket_name
in buckets
:
924 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
926 zonegroup_meta_checkpoint(zonegroup
)
928 objnames_2
= [ 'obj5', 'obj6', 'obj7', 'obj8' ]
930 for zone
, bucket
in zone_bucket
:
931 for objname
in objnames_2
:
932 k
= new_key(zone
, bucket
.name
, objname
)
933 k
.set_contents_from_string(content
)
935 for bucket_name
in buckets
:
936 enable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
938 for bucket_name
in buckets
:
939 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
941 def test_multipart_object_sync():
942 zonegroup
= realm
.master_zonegroup()
943 zonegroup_conns
= ZonegroupConns(zonegroup
)
944 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
946 _
, bucket
= zone_bucket
[0]
948 # initiate a multipart upload
949 upload
= bucket
.initiate_multipart_upload('MULTIPART')
950 mp
= boto
.s3
.multipart
.MultiPartUpload(bucket
)
951 mp
.key_name
= upload
.key_name
953 part_size
= 5 * 1024 * 1024 # 5M min part size
954 mp
.upload_part_from_file(StringIO('a' * part_size
), 1)
955 mp
.upload_part_from_file(StringIO('b' * part_size
), 2)
956 mp
.upload_part_from_file(StringIO('c' * part_size
), 3)
957 mp
.upload_part_from_file(StringIO('d' * part_size
), 4)
960 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
962 def test_encrypted_object_sync():
963 zonegroup
= realm
.master_zonegroup()
964 zonegroup_conns
= ZonegroupConns(zonegroup
)
966 (zone1
, zone2
) = zonegroup_conns
.rw_zones
[0:2]
968 # create a bucket on the first zone
969 bucket_name
= gen_bucket_name()
970 log
.info('create bucket zone=%s name=%s', zone1
.name
, bucket_name
)
971 bucket
= zone1
.conn
.create_bucket(bucket_name
)
973 # upload an object with sse-c encryption
975 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
976 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
977 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
979 key
= bucket
.new_key('testobj-sse-c')
981 key
.set_contents_from_string(data
, headers
=sse_c_headers
)
983 # upload an object with sse-kms encryption
985 'x-amz-server-side-encryption': 'aws:kms',
986 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
987 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
989 key
= bucket
.new_key('testobj-sse-kms')
990 key
.set_contents_from_string(data
, headers
=sse_kms_headers
)
992 # wait for the bucket metadata and data to sync
993 zonegroup_meta_checkpoint(zonegroup
)
994 zone_bucket_checkpoint(zone2
.zone
, zone1
.zone
, bucket_name
)
996 # read the encrypted objects from the second zone
997 bucket2
= get_bucket(zone2
, bucket_name
)
998 key
= bucket2
.get_key('testobj-sse-c', headers
=sse_c_headers
)
999 eq(data
, key
.get_contents_as_string(headers
=sse_c_headers
))
1001 key
= bucket2
.get_key('testobj-sse-kms')
1002 eq(data
, key
.get_contents_as_string())
1004 def test_bucket_index_log_trim():
1005 zonegroup
= realm
.master_zonegroup()
1006 zonegroup_conns
= ZonegroupConns(zonegroup
)
1008 zone
= zonegroup_conns
.rw_zones
[0]
1010 # create a test bucket, upload some objects, and wait for sync
1011 def make_test_bucket():
1012 name
= gen_bucket_name()
1013 log
.info('create bucket zone=%s name=%s', zone
.name
, name
)
1014 bucket
= zone
.conn
.create_bucket(name
)
1015 for objname
in ('a', 'b', 'c', 'd'):
1016 k
= new_key(zone
, name
, objname
)
1017 k
.set_contents_from_string('foo')
1018 zonegroup_meta_checkpoint(zonegroup
)
1019 zonegroup_bucket_checkpoint(zonegroup_conns
, name
)
1022 # create a 'cold' bucket
1023 cold_bucket
= make_test_bucket()
1025 # trim with max-buckets=0 to clear counters for cold bucket. this should
1026 # prevent it from being considered 'active' by the next autotrim
1027 bilog_autotrim(zone
.zone
, [
1028 '--rgw-sync-log-trim-max-buckets', '0',
1031 # create an 'active' bucket
1032 active_bucket
= make_test_bucket()
1034 # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
1035 bilog_autotrim(zone
.zone
, [
1036 '--rgw-sync-log-trim-max-buckets', '1',
1037 '--rgw-sync-log-trim-min-cold-buckets', '0',
1040 # verify active bucket has empty bilog
1041 active_bilog
= bilog_list(zone
.zone
, active_bucket
.name
)
1042 assert(len(active_bilog
) == 0)
1044 # verify cold bucket has nonempty bilog
1045 cold_bilog
= bilog_list(zone
.zone
, cold_bucket
.name
)
1046 assert(len(cold_bilog
) > 0)
1048 # trim with min-cold-buckets=999 to trim all buckets
1049 bilog_autotrim(zone
.zone
, [
1050 '--rgw-sync-log-trim-max-buckets', '999',
1051 '--rgw-sync-log-trim-min-cold-buckets', '999',
1054 # verify cold bucket has empty bilog
1055 cold_bilog
= bilog_list(zone
.zone
, cold_bucket
.name
)
1056 assert(len(cold_bilog
) == 0)