]>
git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests.py
9 from itertools
import izip_longest
as zip_longest
11 from itertools
import zip_longest
12 from itertools
import combinations
13 from cStringIO
import StringIO
16 import boto
.s3
.connection
17 from boto
.s3
.website
import WebsiteConfiguration
18 from boto
.s3
.cors
import CORSConfiguration
20 from nose
.tools
import eq_
as eq
21 from nose
.plugins
.attrib
import attr
22 from nose
.plugins
.skip
import SkipTest
24 from .multisite
import Zone
26 from .conn
import get_gateway_connection
29 """ test configuration """
30 def __init__(self
, **kwargs
):
31 # by default, wait up to 5 minutes before giving up on a sync checkpoint
32 self
.checkpoint_retries
= kwargs
.get('checkpoint_retries', 60)
33 self
.checkpoint_delay
= kwargs
.get('checkpoint_delay', 5)
34 # allow some time for realm reconfiguration after changing master zone
35 self
.reconfigure_delay
= kwargs
.get('reconfigure_delay', 5)
37 # rgw multisite tests, written against the interfaces provided in rgw_multi.
38 # these tests must be initialized and run by another module that provides
39 # implementations of these interfaces by calling init_multi()
43 def init_multi(_realm
, _user
, _config
=None):
49 config
= _config
or Config()
50 realm_meta_checkpoint(realm
)
55 log
= logging
.getLogger(__name__
)
58 run_prefix
=''.join(random
.choice(string
.ascii_lowercase
) for _
in range(6))
60 def get_gateway_connection(gateway
, credentials
):
61 """ connect to the given gateway """
62 if gateway
.connection
is None:
63 gateway
.connection
= boto
.connect_s3(
64 aws_access_key_id
= credentials
.access_key
,
65 aws_secret_access_key
= credentials
.secret
,
69 calling_format
= boto
.s3
.connection
.OrdinaryCallingFormat())
70 return gateway
.connection
72 def get_zone_connection(zone
, credentials
):
73 """ connect to the zone's first gateway """
74 if isinstance(credentials
, list):
75 credentials
= credentials
[0]
76 return get_gateway_connection(zone
.gateways
[0], credentials
)
78 def mdlog_list(zone
, period
= None):
79 cmd
= ['mdlog', 'list']
81 cmd
+= ['--period', period
]
82 (mdlog_json
, _
) = zone
.cluster
.admin(cmd
, read_only
=True)
83 mdlog_json
= mdlog_json
.decode('utf-8')
84 return json
.loads(mdlog_json
)
86 def meta_sync_status(zone
):
88 cmd
= ['metadata', 'sync', 'status'] + zone
.zone_args()
89 meta_sync_status_json
, retcode
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
92 assert(retcode
== 2) # ENOENT
95 def mdlog_autotrim(zone
):
96 zone
.cluster
.admin(['mdlog', 'autotrim'])
98 def bilog_list(zone
, bucket
, args
= None):
99 cmd
= ['bilog', 'list', '--bucket', bucket
] + (args
or [])
100 bilog
, _
= zone
.cluster
.admin(cmd
, read_only
=True)
101 bilog
= bilog
.decode('utf-8')
102 return json
.loads(bilog
)
104 def bilog_autotrim(zone
, args
= None):
105 zone
.cluster
.admin(['bilog', 'autotrim'] + (args
or []))
107 def parse_meta_sync_status(meta_sync_status_json
):
108 meta_sync_status_json
= meta_sync_status_json
.decode('utf-8')
109 log
.debug('current meta sync status=%s', meta_sync_status_json
)
110 sync_status
= json
.loads(meta_sync_status_json
)
112 sync_info
= sync_status
['sync_status']['info']
113 global_sync_status
= sync_info
['status']
114 num_shards
= sync_info
['num_shards']
115 period
= sync_info
['period']
116 realm_epoch
= sync_info
['realm_epoch']
118 sync_markers
=sync_status
['sync_status']['markers']
119 log
.debug('sync_markers=%s', sync_markers
)
120 assert(num_shards
== len(sync_markers
))
123 for i
in range(num_shards
):
124 # get marker, only if it's an incremental marker for the same realm epoch
125 if realm_epoch
> sync_markers
[i
]['val']['realm_epoch'] or sync_markers
[i
]['val']['state'] == 0:
128 markers
[i
] = sync_markers
[i
]['val']['marker']
130 return period
, realm_epoch
, num_shards
, markers
132 def meta_sync_status(zone
):
133 for _
in range(config
.checkpoint_retries
):
134 cmd
= ['metadata', 'sync', 'status'] + zone
.zone_args()
135 meta_sync_status_json
, retcode
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
137 return parse_meta_sync_status(meta_sync_status_json
)
138 assert(retcode
== 2) # ENOENT
139 time
.sleep(config
.checkpoint_delay
)
141 assert False, 'failed to read metadata sync status for zone=%s' % zone
.name
143 def meta_master_log_status(master_zone
):
144 cmd
= ['mdlog', 'status'] + master_zone
.zone_args()
145 mdlog_status_json
, retcode
= master_zone
.cluster
.admin(cmd
, read_only
=True)
146 mdlog_status
= json
.loads(mdlog_status_json
.decode('utf-8'))
148 markers
= {i
: s
['marker'] for i
, s
in enumerate(mdlog_status
)}
149 log
.debug('master meta markers=%s', markers
)
152 def compare_meta_status(zone
, log_status
, sync_status
):
153 if len(log_status
) != len(sync_status
):
154 log
.error('len(log_status)=%d, len(sync_status)=%d', len(log_status
), len(sync_status
))
158 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
162 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
165 log
.warning('zone %s behind master: %s', zone
.name
, msg
)
170 def zone_meta_checkpoint(zone
, meta_master_zone
= None, master_status
= None):
171 if not meta_master_zone
:
172 meta_master_zone
= zone
.realm().meta_master_zone()
173 if not master_status
:
174 master_status
= meta_master_log_status(meta_master_zone
)
176 current_realm_epoch
= realm
.current_period
.data
['realm_epoch']
178 log
.info('starting meta checkpoint for zone=%s', zone
.name
)
180 for _
in range(config
.checkpoint_retries
):
181 period
, realm_epoch
, num_shards
, sync_status
= meta_sync_status(zone
)
182 if realm_epoch
< current_realm_epoch
:
183 log
.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
184 zone
.name
, realm_epoch
, current_realm_epoch
)
186 log
.debug('log_status=%s', master_status
)
187 log
.debug('sync_status=%s', sync_status
)
188 if compare_meta_status(zone
, master_status
, sync_status
):
189 log
.info('finish meta checkpoint for zone=%s', zone
.name
)
192 time
.sleep(config
.checkpoint_delay
)
193 assert False, 'failed meta checkpoint for zone=%s' % zone
.name
195 def zonegroup_meta_checkpoint(zonegroup
, meta_master_zone
= None, master_status
= None):
196 if not meta_master_zone
:
197 meta_master_zone
= zonegroup
.realm().meta_master_zone()
198 if not master_status
:
199 master_status
= meta_master_log_status(meta_master_zone
)
201 for zone
in zonegroup
.zones
:
202 if zone
== meta_master_zone
:
204 zone_meta_checkpoint(zone
, meta_master_zone
, master_status
)
206 def realm_meta_checkpoint(realm
):
207 log
.info('meta checkpoint')
209 meta_master_zone
= realm
.meta_master_zone()
210 master_status
= meta_master_log_status(meta_master_zone
)
212 for zonegroup
in realm
.current_period
.zonegroups
:
213 zonegroup_meta_checkpoint(zonegroup
, meta_master_zone
, master_status
)
215 def parse_data_sync_status(data_sync_status_json
):
216 data_sync_status_json
= data_sync_status_json
.decode('utf-8')
217 log
.debug('current data sync status=%s', data_sync_status_json
)
218 sync_status
= json
.loads(data_sync_status_json
)
220 global_sync_status
=sync_status
['sync_status']['info']['status']
221 num_shards
=sync_status
['sync_status']['info']['num_shards']
223 sync_markers
=sync_status
['sync_status']['markers']
224 log
.debug('sync_markers=%s', sync_markers
)
225 assert(num_shards
== len(sync_markers
))
228 for i
in range(num_shards
):
229 markers
[i
] = sync_markers
[i
]['val']['marker']
231 return (num_shards
, markers
)
233 def data_sync_status(target_zone
, source_zone
):
234 if target_zone
== source_zone
:
237 for _
in range(config
.checkpoint_retries
):
238 cmd
= ['data', 'sync', 'status'] + target_zone
.zone_args()
239 cmd
+= ['--source-zone', source_zone
.name
]
240 data_sync_status_json
, retcode
= target_zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
242 return parse_data_sync_status(data_sync_status_json
)
244 assert(retcode
== 2) # ENOENT
245 time
.sleep(config
.checkpoint_delay
)
247 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
248 (target_zone
.name
, source_zone
.name
)
250 def bucket_sync_status(target_zone
, source_zone
, bucket_name
):
251 if target_zone
== source_zone
:
254 cmd
= ['bucket', 'sync', 'status'] + target_zone
.zone_args()
255 cmd
+= ['--source-zone', source_zone
.name
]
256 cmd
+= ['--bucket', bucket_name
]
258 bucket_sync_status_json
, retcode
= target_zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
262 assert(retcode
== 2) # ENOENT
264 bucket_sync_status_json
= bucket_sync_status_json
.decode('utf-8')
265 log
.debug('current bucket sync status=%s', bucket_sync_status_json
)
266 sync_status
= json
.loads(bucket_sync_status_json
)
269 for entry
in sync_status
:
271 if val
['status'] == 'incremental-sync':
272 pos
= val
['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
275 markers
[entry
['key']] = pos
279 def data_source_log_status(source_zone
):
280 source_cluster
= source_zone
.cluster
281 cmd
= ['datalog', 'status'] + source_zone
.zone_args()
282 datalog_status_json
, retcode
= source_cluster
.rgw_admin(cmd
, read_only
=True)
283 datalog_status
= json
.loads(datalog_status_json
.decode('utf-8'))
285 markers
= {i
: s
['marker'] for i
, s
in enumerate(datalog_status
)}
286 log
.debug('data markers for zone=%s markers=%s', source_zone
.name
, markers
)
289 def bucket_source_log_status(source_zone
, bucket_name
):
290 cmd
= ['bilog', 'status'] + source_zone
.zone_args()
291 cmd
+= ['--bucket', bucket_name
]
292 source_cluster
= source_zone
.cluster
293 bilog_status_json
, retcode
= source_cluster
.admin(cmd
, read_only
=True)
294 bilog_status
= json
.loads(bilog_status_json
.decode('utf-8'))
299 m
= bilog_status
['markers']
308 log
.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone
.name
, bucket_name
, markers
)
311 def compare_data_status(target_zone
, source_zone
, log_status
, sync_status
):
312 if len(log_status
) != len(sync_status
):
313 log
.error('len(log_status)=%d len(sync_status)=%d', len(log_status
), len(sync_status
))
317 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
321 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
324 log
.warning('data of zone %s behind zone %s: %s', target_zone
.name
, source_zone
.name
, msg
)
329 def compare_bucket_status(target_zone
, source_zone
, bucket_name
, log_status
, sync_status
):
330 if len(log_status
) != len(sync_status
):
331 log
.error('len(log_status)=%d len(sync_status)=%d', len(log_status
), len(sync_status
))
335 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
339 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
342 log
.warning('bucket %s zone %s behind zone %s: %s', bucket_name
, target_zone
.name
, source_zone
.name
, msg
)
347 def zone_data_checkpoint(target_zone
, source_zone_conn
):
348 if target_zone
== source_zone
:
351 log_status
= data_source_log_status(source_zone
)
352 log
.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone
.name
, source_zone
.name
)
354 for _
in range(config
.checkpoint_retries
):
355 num_shards
, sync_status
= data_sync_status(target_zone
, source_zone
)
357 log
.debug('log_status=%s', log_status
)
358 log
.debug('sync_status=%s', sync_status
)
360 if compare_data_status(target_zone
, source_zone
, log_status
, sync_status
):
361 log
.info('finished data checkpoint for target_zone=%s source_zone=%s',
362 target_zone
.name
, source_zone
.name
)
364 time
.sleep(config
.checkpoint_delay
)
366 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
367 (target_zone
.name
, source_zone
.name
)
370 def zone_bucket_checkpoint(target_zone
, source_zone
, bucket_name
):
371 if target_zone
== source_zone
:
374 log_status
= bucket_source_log_status(source_zone
, bucket_name
)
375 log
.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone
.name
, source_zone
.name
, bucket_name
)
377 for _
in range(config
.checkpoint_retries
):
378 sync_status
= bucket_sync_status(target_zone
, source_zone
, bucket_name
)
380 log
.debug('log_status=%s', log_status
)
381 log
.debug('sync_status=%s', sync_status
)
383 if compare_bucket_status(target_zone
, source_zone
, bucket_name
, log_status
, sync_status
):
384 log
.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone
.name
, source_zone
.name
, bucket_name
)
387 time
.sleep(config
.checkpoint_delay
)
389 assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
390 (target_zone
.name
, source_zone
.name
, bucket_name
)
392 def zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
):
393 for source_conn
in zonegroup_conns
.zones
:
394 for target_conn
in zonegroup_conns
.zones
:
395 if source_conn
.zone
== target_conn
.zone
:
397 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket_name
)
398 target_conn
.check_bucket_eq(source_conn
, bucket_name
)
400 def set_master_zone(zone
):
401 zone
.modify(zone
.cluster
, ['--master'])
402 zonegroup
= zone
.zonegroup
403 zonegroup
.period
.update(zone
, commit
=True)
404 zonegroup
.master_zone
= zone
405 log
.info('Set master zone=%s, waiting %ds for reconfiguration..', zone
.name
, config
.reconfigure_delay
)
406 time
.sleep(config
.reconfigure_delay
)
408 def enable_bucket_sync(zone
, bucket_name
):
409 cmd
= ['bucket', 'sync', 'enable', '--bucket', bucket_name
] + zone
.zone_args()
410 zone
.cluster
.admin(cmd
)
412 def disable_bucket_sync(zone
, bucket_name
):
413 cmd
= ['bucket', 'sync', 'disable', '--bucket', bucket_name
] + zone
.zone_args()
414 zone
.cluster
.admin(cmd
)
416 def check_buckets_sync_status_obj_not_exist(zone
, buckets
):
417 for _
in range(config
.checkpoint_retries
):
418 cmd
= ['log', 'list'] + zone
.zone_arg()
419 log_list
, ret
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
420 for bucket
in buckets
:
421 if log_list
.find(':'+bucket
+":") >= 0:
425 time
.sleep(config
.checkpoint_delay
)
428 def gen_bucket_name():
432 return run_prefix
+ '-' + str(num_buckets
)
434 class ZonegroupConns
:
435 def __init__(self
, zonegroup
):
436 self
.zonegroup
= zonegroup
440 self
.master_zone
= None
441 for z
in zonegroup
.zones
:
442 zone_conn
= z
.get_conn(user
.credentials
)
443 self
.zones
.append(zone_conn
)
445 self
.ro_zones
.append(zone_conn
)
447 self
.rw_zones
.append(zone_conn
)
449 if z
== zonegroup
.master_zone
:
450 self
.master_zone
= zone_conn
452 def check_all_buckets_exist(zone_conn
, buckets
):
453 if not zone_conn
.zone
.has_buckets():
458 zone_conn
.get_bucket(b
)
460 log
.critical('zone %s does not contain bucket %s', zone
.name
, b
)
465 def check_all_buckets_dont_exist(zone_conn
, buckets
):
466 if not zone_conn
.zone
.has_buckets():
471 zone_conn
.get_bucket(b
)
475 log
.critical('zone %s contains bucket %s', zone
.zone
, b
)
480 def create_bucket_per_zone(zonegroup_conns
, buckets_per_zone
= 1):
483 for zone
in zonegroup_conns
.rw_zones
:
484 for i
in xrange(buckets_per_zone
):
485 bucket_name
= gen_bucket_name()
486 log
.info('create bucket zone=%s name=%s', zone
.name
, bucket_name
)
487 bucket
= zone
.create_bucket(bucket_name
)
488 buckets
.append(bucket_name
)
489 zone_bucket
.append((zone
, bucket
))
491 return buckets
, zone_bucket
493 def create_bucket_per_zone_in_realm():
496 for zonegroup
in realm
.current_period
.zonegroups
:
497 zg_conn
= ZonegroupConns(zonegroup
)
498 b
, z
= create_bucket_per_zone(zg_conn
)
500 zone_bucket
.extend(z
)
501 return buckets
, zone_bucket
503 def test_bucket_create():
504 zonegroup
= realm
.master_zonegroup()
505 zonegroup_conns
= ZonegroupConns(zonegroup
)
506 buckets
, _
= create_bucket_per_zone(zonegroup_conns
)
507 zonegroup_meta_checkpoint(zonegroup
)
509 for zone
in zonegroup_conns
.zones
:
510 assert check_all_buckets_exist(zone
, buckets
)
512 def test_bucket_recreate():
513 zonegroup
= realm
.master_zonegroup()
514 zonegroup_conns
= ZonegroupConns(zonegroup
)
515 buckets
, _
= create_bucket_per_zone(zonegroup_conns
)
516 zonegroup_meta_checkpoint(zonegroup
)
519 for zone
in zonegroup_conns
.zones
:
520 assert check_all_buckets_exist(zone
, buckets
)
522 # recreate buckets on all zones, make sure they weren't removed
523 for zone
in zonegroup_conns
.rw_zones
:
524 for bucket_name
in buckets
:
525 bucket
= zone
.create_bucket(bucket_name
)
527 for zone
in zonegroup_conns
.zones
:
528 assert check_all_buckets_exist(zone
, buckets
)
530 zonegroup_meta_checkpoint(zonegroup
)
532 for zone
in zonegroup_conns
.zones
:
533 assert check_all_buckets_exist(zone
, buckets
)
535 def test_bucket_remove():
536 zonegroup
= realm
.master_zonegroup()
537 zonegroup_conns
= ZonegroupConns(zonegroup
)
538 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
539 zonegroup_meta_checkpoint(zonegroup
)
541 for zone
in zonegroup_conns
.zones
:
542 assert check_all_buckets_exist(zone
, buckets
)
544 for zone
, bucket_name
in zone_bucket
:
545 zone
.conn
.delete_bucket(bucket_name
)
547 zonegroup_meta_checkpoint(zonegroup
)
549 for zone
in zonegroup_conns
.zones
:
550 assert check_all_buckets_dont_exist(zone
, buckets
)
552 def get_bucket(zone
, bucket_name
):
553 return zone
.conn
.get_bucket(bucket_name
)
555 def get_key(zone
, bucket_name
, obj_name
):
556 b
= get_bucket(zone
, bucket_name
)
557 return b
.get_key(obj_name
)
559 def new_key(zone
, bucket_name
, obj_name
):
560 b
= get_bucket(zone
, bucket_name
)
561 return b
.new_key(obj_name
)
563 def check_bucket_eq(zone_conn1
, zone_conn2
, bucket
):
564 return zone_conn2
.check_bucket_eq(zone_conn1
, bucket
.name
)
566 def test_object_sync():
567 zonegroup
= realm
.master_zonegroup()
568 zonegroup_conns
= ZonegroupConns(zonegroup
)
569 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
571 objnames
= [ 'myobj', '_myobj', ':', '&' ]
574 # don't wait for meta sync just yet
575 for zone
, bucket_name
in zone_bucket
:
576 for objname
in objnames
:
577 k
= new_key(zone
, bucket_name
, objname
)
578 k
.set_contents_from_string(content
)
580 zonegroup_meta_checkpoint(zonegroup
)
582 for source_conn
, bucket
in zone_bucket
:
583 for target_conn
in zonegroup_conns
.zones
:
584 if source_conn
.zone
== target_conn
.zone
:
587 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
588 check_bucket_eq(source_conn
, target_conn
, bucket
)
590 def test_object_delete():
591 zonegroup
= realm
.master_zonegroup()
592 zonegroup_conns
= ZonegroupConns(zonegroup
)
593 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
598 # don't wait for meta sync just yet
599 for zone
, bucket
in zone_bucket
:
600 k
= new_key(zone
, bucket
, objname
)
601 k
.set_contents_from_string(content
)
603 zonegroup_meta_checkpoint(zonegroup
)
605 # check object exists
606 for source_conn
, bucket
in zone_bucket
:
607 for target_conn
in zonegroup_conns
.zones
:
608 if source_conn
.zone
== target_conn
.zone
:
611 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
612 check_bucket_eq(source_conn
, target_conn
, bucket
)
614 # check object removal
615 for source_conn
, bucket
in zone_bucket
:
616 k
= get_key(source_conn
, bucket
, objname
)
618 for target_conn
in zonegroup_conns
.zones
:
619 if source_conn
.zone
== target_conn
.zone
:
622 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
623 check_bucket_eq(source_conn
, target_conn
, bucket
)
625 def get_latest_object_version(key
):
626 for k
in key
.bucket
.list_versions(key
.name
):
631 def test_versioned_object_incremental_sync():
632 zonegroup
= realm
.master_zonegroup()
633 zonegroup_conns
= ZonegroupConns(zonegroup
)
634 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
637 for _
, bucket
in zone_bucket
:
638 bucket
.configure_versioning(True)
640 zonegroup_meta_checkpoint(zonegroup
)
642 # upload a dummy object to each bucket and wait for sync. this forces each
643 # bucket to finish a full sync and switch to incremental
644 for source_conn
, bucket
in zone_bucket
:
645 new_key(source_conn
, bucket
, 'dummy').set_contents_from_string('')
646 for target_conn
in zonegroup_conns
.zones
:
647 if source_conn
.zone
== target_conn
.zone
:
649 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
651 for _
, bucket
in zone_bucket
:
652 # create and delete multiple versions of an object from each zone
653 for zone_conn
in zonegroup_conns
.rw_zones
:
654 obj
= 'obj-' + zone_conn
.name
655 k
= new_key(zone_conn
, bucket
, obj
)
657 k
.set_contents_from_string('version1')
658 v
= get_latest_object_version(k
)
659 log
.debug('version1 id=%s', v
.version_id
)
660 # don't delete version1 - this tests that the initial version
661 # doesn't get squashed into later versions
663 # create and delete the following object versions to test that
664 # the operations don't race with each other during sync
665 k
.set_contents_from_string('version2')
666 v
= get_latest_object_version(k
)
667 log
.debug('version2 id=%s', v
.version_id
)
668 k
.bucket
.delete_key(obj
, version_id
=v
.version_id
)
670 k
.set_contents_from_string('version3')
671 v
= get_latest_object_version(k
)
672 log
.debug('version3 id=%s', v
.version_id
)
673 k
.bucket
.delete_key(obj
, version_id
=v
.version_id
)
675 for source_conn
, bucket
in zone_bucket
:
676 for target_conn
in zonegroup_conns
.zones
:
677 if source_conn
.zone
== target_conn
.zone
:
679 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
680 check_bucket_eq(source_conn
, target_conn
, bucket
)
682 def test_bucket_versioning():
683 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
684 for _
, bucket
in zone_bucket
:
685 bucket
.configure_versioning(True)
686 res
= bucket
.get_versioning_status()
688 assert(key
in res
and res
[key
] == 'Enabled')
690 def test_bucket_acl():
691 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
692 for _
, bucket
in zone_bucket
:
693 assert(len(bucket
.get_acl().acl
.grants
) == 1) # single grant on owner
694 bucket
.set_acl('public-read')
695 assert(len(bucket
.get_acl().acl
.grants
) == 2) # new grant on AllUsers
697 def test_bucket_cors():
698 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
699 for _
, bucket
in zone_bucket
:
700 cors_cfg
= CORSConfiguration()
701 cors_cfg
.add_rule(['DELETE'], 'https://www.example.com', allowed_header
='*', max_age_seconds
=3000)
702 bucket
.set_cors(cors_cfg
)
703 assert(bucket
.get_cors().to_xml() == cors_cfg
.to_xml())
705 def test_bucket_delete_notempty():
706 zonegroup
= realm
.master_zonegroup()
707 zonegroup_conns
= ZonegroupConns(zonegroup
)
708 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
709 zonegroup_meta_checkpoint(zonegroup
)
711 for zone_conn
, bucket_name
in zone_bucket
:
712 # upload an object to each bucket on its own zone
713 conn
= zone_conn
.get_connection()
714 bucket
= conn
.get_bucket(bucket_name
)
715 k
= bucket
.new_key('foo')
716 k
.set_contents_from_string('bar')
717 # attempt to delete the bucket before this object can sync
719 conn
.delete_bucket(bucket_name
)
720 except boto
.exception
.S3ResponseError
as e
:
721 assert(e
.error_code
== 'BucketNotEmpty')
723 assert False # expected 409 BucketNotEmpty
725 # assert that each bucket still exists on the master
726 c1
= zonegroup_conns
.master_zone
.conn
727 for _
, bucket_name
in zone_bucket
:
728 assert c1
.get_bucket(bucket_name
)
730 def test_multi_period_incremental_sync():
731 zonegroup
= realm
.master_zonegroup()
732 if len(zonegroup
.zones
) < 3:
733 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
735 # periods to include in mdlog comparison
736 mdlog_periods
= [realm
.current_period
.id]
738 # create a bucket in each zone
739 zonegroup_conns
= ZonegroupConns(zonegroup
)
740 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
742 zonegroup_meta_checkpoint(zonegroup
)
744 z1
, z2
, z3
= zonegroup
.zones
[0:3]
745 assert(z1
== zonegroup
.master_zone
)
747 # kill zone 3 gateways to freeze sync status to incremental in first period
750 # change master to zone 2 -> period 2
752 mdlog_periods
+= [realm
.current_period
.id]
754 for zone_conn
, _
in zone_bucket
:
755 if zone_conn
.zone
== z3
:
757 bucket_name
= gen_bucket_name()
758 log
.info('create bucket zone=%s name=%s', zone_conn
.name
, bucket_name
)
759 bucket
= zone_conn
.conn
.create_bucket(bucket_name
)
760 buckets
.append(bucket_name
)
762 # wait for zone 1 to sync
763 zone_meta_checkpoint(z1
)
765 # change master back to zone 1 -> period 3
767 mdlog_periods
+= [realm
.current_period
.id]
769 for zone_conn
, bucket_name
in zone_bucket
:
770 if zone_conn
.zone
== z3
:
772 bucket_name
= gen_bucket_name()
773 log
.info('create bucket zone=%s name=%s', zone_conn
.name
, bucket_name
)
774 bucket
= zone_conn
.conn
.create_bucket(bucket_name
)
775 buckets
.append(bucket_name
)
777 # restart zone 3 gateway and wait for sync
779 zonegroup_meta_checkpoint(zonegroup
)
781 # verify that we end up with the same objects
782 for bucket_name
in buckets
:
783 for source_conn
, _
in zone_bucket
:
784 for target_conn
in zonegroup_conns
.zones
:
785 if source_conn
.zone
== target_conn
.zone
:
788 target_conn
.check_bucket_eq(source_conn
, bucket_name
)
790 # verify that mdlogs are not empty and match for each period
791 for period
in mdlog_periods
:
792 master_mdlog
= mdlog_list(z1
, period
)
793 assert len(master_mdlog
) > 0
794 for zone
in zonegroup
.zones
:
797 mdlog
= mdlog_list(zone
, period
)
798 assert len(mdlog
) == len(master_mdlog
)
800 # autotrim mdlogs for master zone
803 # autotrim mdlogs for peers
804 for zone
in zonegroup
.zones
:
809 # verify that mdlogs are empty for each period
810 for period
in mdlog_periods
:
811 for zone
in zonegroup
.zones
:
812 mdlog
= mdlog_list(zone
, period
)
813 assert len(mdlog
) == 0
815 def test_zonegroup_remove():
816 zonegroup
= realm
.master_zonegroup()
817 zonegroup_conns
= ZonegroupConns(zonegroup
)
818 if len(zonegroup
.zones
) < 2:
819 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
821 zonegroup_meta_checkpoint(zonegroup
)
822 z1
, z2
= zonegroup
.zones
[0:2]
823 c1
, c2
= (z1
.cluster
, z2
.cluster
)
825 # create a new zone in zonegroup on c2 and commit
826 zone
= Zone('remove', zonegroup
, c2
)
828 zonegroup
.zones
.append(zone
)
829 zonegroup
.period
.update(zone
, commit
=True)
831 zonegroup
.remove(c1
, zone
)
833 # another 'zonegroup remove' should fail with ENOENT
834 _
, retcode
= zonegroup
.remove(c1
, zone
, check_retcode
=False)
835 assert(retcode
== 2) # ENOENT
837 # delete the new zone
840 # validate the resulting period
841 zonegroup
.period
.update(z1
, commit
=True)
843 def test_set_bucket_website():
844 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
845 for _
, bucket
in zone_bucket
:
846 website_cfg
= WebsiteConfiguration(suffix
='index.html',error_key
='error.html')
848 bucket
.set_website_configuration(website_cfg
)
849 except boto
.exception
.S3ResponseError
as e
:
850 if e
.error_code
== 'MethodNotAllowed':
851 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
852 assert(bucket
.get_website_configuration_with_xml()[1] == website_cfg
.to_xml())
854 def test_set_bucket_policy():
856 "Version": "2012-10-17",
862 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
863 for _
, bucket
in zone_bucket
:
864 bucket
.set_policy(policy
)
865 assert(bucket
.get_policy() == policy
)
867 def test_bucket_sync_disable():
868 zonegroup
= realm
.master_zonegroup()
869 zonegroup_conns
= ZonegroupConns(zonegroup
)
870 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
872 for bucket_name
in buckets
:
873 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
875 for zone
in zonegroup
.zones
:
876 check_buckets_sync_status_obj_not_exist(zone
, buckets
)
878 def test_bucket_sync_enable_right_after_disable():
879 zonegroup
= realm
.master_zonegroup()
880 zonegroup_conns
= ZonegroupConns(zonegroup
)
881 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
883 objnames
= ['obj1', 'obj2', 'obj3', 'obj4']
886 for zone
, bucket
in zone_bucket
:
887 for objname
in objnames
:
888 k
= new_key(zone
, bucket
.name
, objname
)
889 k
.set_contents_from_string(content
)
891 for bucket_name
in buckets
:
892 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
894 for bucket_name
in buckets
:
895 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
896 enable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
898 objnames_2
= ['obj5', 'obj6', 'obj7', 'obj8']
900 for zone
, bucket
in zone_bucket
:
901 for objname
in objnames_2
:
902 k
= new_key(zone
, bucket
.name
, objname
)
903 k
.set_contents_from_string(content
)
905 for bucket_name
in buckets
:
906 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
908 def test_bucket_sync_disable_enable():
909 zonegroup
= realm
.master_zonegroup()
910 zonegroup_conns
= ZonegroupConns(zonegroup
)
911 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
913 objnames
= [ 'obj1', 'obj2', 'obj3', 'obj4' ]
916 for zone
, bucket
in zone_bucket
:
917 for objname
in objnames
:
918 k
= new_key(zone
, bucket
.name
, objname
)
919 k
.set_contents_from_string(content
)
921 zonegroup_meta_checkpoint(zonegroup
)
923 for bucket_name
in buckets
:
924 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
926 for bucket_name
in buckets
:
927 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
929 zonegroup_meta_checkpoint(zonegroup
)
931 objnames_2
= [ 'obj5', 'obj6', 'obj7', 'obj8' ]
933 for zone
, bucket
in zone_bucket
:
934 for objname
in objnames_2
:
935 k
= new_key(zone
, bucket
.name
, objname
)
936 k
.set_contents_from_string(content
)
938 for bucket_name
in buckets
:
939 enable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
941 for bucket_name
in buckets
:
942 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
944 def test_multipart_object_sync():
945 zonegroup
= realm
.master_zonegroup()
946 zonegroup_conns
= ZonegroupConns(zonegroup
)
947 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
949 _
, bucket
= zone_bucket
[0]
951 # initiate a multipart upload
952 upload
= bucket
.initiate_multipart_upload('MULTIPART')
953 mp
= boto
.s3
.multipart
.MultiPartUpload(bucket
)
954 mp
.key_name
= upload
.key_name
956 part_size
= 5 * 1024 * 1024 # 5M min part size
957 mp
.upload_part_from_file(StringIO('a' * part_size
), 1)
958 mp
.upload_part_from_file(StringIO('b' * part_size
), 2)
959 mp
.upload_part_from_file(StringIO('c' * part_size
), 3)
960 mp
.upload_part_from_file(StringIO('d' * part_size
), 4)
963 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
965 def test_encrypted_object_sync():
966 zonegroup
= realm
.master_zonegroup()
967 zonegroup_conns
= ZonegroupConns(zonegroup
)
969 (zone1
, zone2
) = zonegroup_conns
.rw_zones
[0:2]
971 # create a bucket on the first zone
972 bucket_name
= gen_bucket_name()
973 log
.info('create bucket zone=%s name=%s', zone1
.name
, bucket_name
)
974 bucket
= zone1
.conn
.create_bucket(bucket_name
)
976 # upload an object with sse-c encryption
978 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
979 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
980 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
982 key
= bucket
.new_key('testobj-sse-c')
984 key
.set_contents_from_string(data
, headers
=sse_c_headers
)
986 # upload an object with sse-kms encryption
988 'x-amz-server-side-encryption': 'aws:kms',
989 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
990 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
992 key
= bucket
.new_key('testobj-sse-kms')
993 key
.set_contents_from_string(data
, headers
=sse_kms_headers
)
995 # wait for the bucket metadata and data to sync
996 zonegroup_meta_checkpoint(zonegroup
)
997 zone_bucket_checkpoint(zone2
.zone
, zone1
.zone
, bucket_name
)
999 # read the encrypted objects from the second zone
1000 bucket2
= get_bucket(zone2
, bucket_name
)
1001 key
= bucket2
.get_key('testobj-sse-c', headers
=sse_c_headers
)
1002 eq(data
, key
.get_contents_as_string(headers
=sse_c_headers
))
1004 key
= bucket2
.get_key('testobj-sse-kms')
1005 eq(data
, key
.get_contents_as_string())
1007 def test_bucket_index_log_trim():
1008 zonegroup
= realm
.master_zonegroup()
1009 zonegroup_conns
= ZonegroupConns(zonegroup
)
1011 zone
= zonegroup_conns
.rw_zones
[0]
1013 # create a test bucket, upload some objects, and wait for sync
1014 def make_test_bucket():
1015 name
= gen_bucket_name()
1016 log
.info('create bucket zone=%s name=%s', zone
.name
, name
)
1017 bucket
= zone
.conn
.create_bucket(name
)
1018 for objname
in ('a', 'b', 'c', 'd'):
1019 k
= new_key(zone
, name
, objname
)
1020 k
.set_contents_from_string('foo')
1021 zonegroup_meta_checkpoint(zonegroup
)
1022 zonegroup_bucket_checkpoint(zonegroup_conns
, name
)
1025 # create a 'cold' bucket
1026 cold_bucket
= make_test_bucket()
1028 # trim with max-buckets=0 to clear counters for cold bucket. this should
1029 # prevent it from being considered 'active' by the next autotrim
1030 bilog_autotrim(zone
.zone
, [
1031 '--rgw-sync-log-trim-max-buckets', '0',
1034 # create an 'active' bucket
1035 active_bucket
= make_test_bucket()
1037 # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
1038 bilog_autotrim(zone
.zone
, [
1039 '--rgw-sync-log-trim-max-buckets', '1',
1040 '--rgw-sync-log-trim-min-cold-buckets', '0',
1043 # verify active bucket has empty bilog
1044 active_bilog
= bilog_list(zone
.zone
, active_bucket
.name
)
1045 assert(len(active_bilog
) == 0)
1047 # verify cold bucket has nonempty bilog
1048 cold_bilog
= bilog_list(zone
.zone
, cold_bucket
.name
)
1049 assert(len(cold_bilog
) > 0)
1051 # trim with min-cold-buckets=999 to trim all buckets
1052 bilog_autotrim(zone
.zone
, [
1053 '--rgw-sync-log-trim-max-buckets', '999',
1054 '--rgw-sync-log-trim-min-cold-buckets', '999',
1057 # verify cold bucket has empty bilog
1058 cold_bilog
= bilog_list(zone
.zone
, cold_bucket
.name
)
1059 assert(len(cold_bilog
) == 0)