]>
git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests.py
10 from itertools
import combinations
11 from itertools
import zip_longest
12 from io
import StringIO
15 import boto
.s3
.connection
16 from boto
.s3
.website
import WebsiteConfiguration
17 from boto
.s3
.cors
import CORSConfiguration
19 from nose
.tools
import eq_
as eq
20 from nose
.tools
import assert_not_equal
, assert_equal
21 from nose
.plugins
.attrib
import attr
22 from nose
.plugins
.skip
import SkipTest
24 from .multisite
import Zone
, ZoneGroup
, Credentials
26 from .conn
import get_gateway_connection
27 from .tools
import assert_raises
30 """ test configuration """
31 def __init__(self
, **kwargs
):
32 # by default, wait up to 5 minutes before giving up on a sync checkpoint
33 self
.checkpoint_retries
= kwargs
.get('checkpoint_retries', 60)
34 self
.checkpoint_delay
= kwargs
.get('checkpoint_delay', 5)
35 # allow some time for realm reconfiguration after changing master zone
36 self
.reconfigure_delay
= kwargs
.get('reconfigure_delay', 5)
37 self
.tenant
= kwargs
.get('tenant', '')
39 # rgw multisite tests, written against the interfaces provided in rgw_multi.
40 # these tests must be initialized and run by another module that provides
41 # implementations of these interfaces by calling init_multi()
45 def init_multi(_realm
, _user
, _config
=None):
51 config
= _config
or Config()
52 realm_meta_checkpoint(realm
)
55 return user
.id if user
is not None else ''
58 return config
.tenant
if config
is not None and config
.tenant
is not None else ''
63 log
= logging
.getLogger('rgw_multi.tests')
66 run_prefix
=''.join(random
.choice(string
.ascii_lowercase
) for _
in range(6))
70 def get_zone_connection(zone
, credentials
):
71 """ connect to the zone's first gateway """
72 if isinstance(credentials
, list):
73 credentials
= credentials
[0]
74 return get_gateway_connection(zone
.gateways
[0], credentials
)
76 def mdlog_list(zone
, period
= None):
77 cmd
= ['mdlog', 'list']
79 cmd
+= ['--period', period
]
80 (mdlog_json
, _
) = zone
.cluster
.admin(cmd
, read_only
=True)
81 return json
.loads(mdlog_json
)
83 def mdlog_autotrim(zone
):
84 zone
.cluster
.admin(['mdlog', 'autotrim'])
86 def datalog_list(zone
, args
= None):
87 cmd
= ['datalog', 'list'] + (args
or [])
88 (datalog_json
, _
) = zone
.cluster
.admin(cmd
, read_only
=True)
89 return json
.loads(datalog_json
)
91 def datalog_status(zone
):
92 cmd
= ['datalog', 'status']
93 (datalog_json
, _
) = zone
.cluster
.admin(cmd
, read_only
=True)
94 return json
.loads(datalog_json
)
96 def datalog_autotrim(zone
):
97 zone
.cluster
.admin(['datalog', 'autotrim'])
99 def bilog_list(zone
, bucket
, args
= None):
100 cmd
= ['bilog', 'list', '--bucket', bucket
] + (args
or [])
101 cmd
+= ['--tenant', config
.tenant
, '--uid', user
.name
] if config
.tenant
else []
102 bilog
, _
= zone
.cluster
.admin(cmd
, read_only
=True)
103 return json
.loads(bilog
)
105 def bilog_autotrim(zone
, args
= None):
106 zone
.cluster
.admin(['bilog', 'autotrim'] + (args
or []))
108 def bucket_layout(zone
, bucket
, args
= None):
109 (bl_output
,_
) = zone
.cluster
.admin(['bucket', 'layout', '--bucket', bucket
] + (args
or []))
110 return json
.loads(bl_output
)
112 def parse_meta_sync_status(meta_sync_status_json
):
113 log
.debug('current meta sync status=%s', meta_sync_status_json
)
114 sync_status
= json
.loads(meta_sync_status_json
)
116 sync_info
= sync_status
['sync_status']['info']
117 global_sync_status
= sync_info
['status']
118 num_shards
= sync_info
['num_shards']
119 period
= sync_info
['period']
120 realm_epoch
= sync_info
['realm_epoch']
122 sync_markers
=sync_status
['sync_status']['markers']
123 log
.debug('sync_markers=%s', sync_markers
)
124 assert(num_shards
== len(sync_markers
))
127 for i
in range(num_shards
):
128 # get marker, only if it's an incremental marker for the same realm epoch
129 if realm_epoch
> sync_markers
[i
]['val']['realm_epoch'] or sync_markers
[i
]['val']['state'] == 0:
132 markers
[i
] = sync_markers
[i
]['val']['marker']
134 return period
, realm_epoch
, num_shards
, markers
136 def meta_sync_status(zone
):
137 for _
in range(config
.checkpoint_retries
):
138 cmd
= ['metadata', 'sync', 'status'] + zone
.zone_args()
139 meta_sync_status_json
, retcode
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
141 return parse_meta_sync_status(meta_sync_status_json
)
142 assert(retcode
== 2) # ENOENT
143 time
.sleep(config
.checkpoint_delay
)
145 assert False, 'failed to read metadata sync status for zone=%s' % zone
.name
147 def meta_master_log_status(master_zone
):
148 cmd
= ['mdlog', 'status'] + master_zone
.zone_args()
149 mdlog_status_json
, retcode
= master_zone
.cluster
.admin(cmd
, read_only
=True)
150 mdlog_status
= json
.loads(mdlog_status_json
)
152 markers
= {i
: s
['marker'] for i
, s
in enumerate(mdlog_status
)}
153 log
.debug('master meta markers=%s', markers
)
156 def compare_meta_status(zone
, log_status
, sync_status
):
157 if len(log_status
) != len(sync_status
):
158 log
.error('len(log_status)=%d, len(sync_status)=%d', len(log_status
), len(sync_status
))
162 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
166 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
169 log
.warning('zone %s behind master: %s', zone
.name
, msg
)
174 def zone_meta_checkpoint(zone
, meta_master_zone
= None, master_status
= None):
175 if not meta_master_zone
:
176 meta_master_zone
= zone
.realm().meta_master_zone()
177 if not master_status
:
178 master_status
= meta_master_log_status(meta_master_zone
)
180 current_realm_epoch
= realm
.current_period
.data
['realm_epoch']
182 log
.info('starting meta checkpoint for zone=%s', zone
.name
)
184 for _
in range(config
.checkpoint_retries
):
185 period
, realm_epoch
, num_shards
, sync_status
= meta_sync_status(zone
)
186 if realm_epoch
< current_realm_epoch
:
187 log
.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
188 zone
.name
, realm_epoch
, current_realm_epoch
)
190 log
.debug('log_status=%s', master_status
)
191 log
.debug('sync_status=%s', sync_status
)
192 if compare_meta_status(zone
, master_status
, sync_status
):
193 log
.info('finish meta checkpoint for zone=%s', zone
.name
)
196 time
.sleep(config
.checkpoint_delay
)
197 assert False, 'failed meta checkpoint for zone=%s' % zone
.name
199 def zonegroup_meta_checkpoint(zonegroup
, meta_master_zone
= None, master_status
= None):
200 if not meta_master_zone
:
201 meta_master_zone
= zonegroup
.realm().meta_master_zone()
202 if not master_status
:
203 master_status
= meta_master_log_status(meta_master_zone
)
205 for zone
in zonegroup
.zones
:
206 if zone
== meta_master_zone
:
208 zone_meta_checkpoint(zone
, meta_master_zone
, master_status
)
210 def realm_meta_checkpoint(realm
):
211 log
.info('meta checkpoint')
213 meta_master_zone
= realm
.meta_master_zone()
214 master_status
= meta_master_log_status(meta_master_zone
)
216 for zonegroup
in realm
.current_period
.zonegroups
:
217 zonegroup_meta_checkpoint(zonegroup
, meta_master_zone
, master_status
)
219 def parse_data_sync_status(data_sync_status_json
):
220 log
.debug('current data sync status=%s', data_sync_status_json
)
221 sync_status
= json
.loads(data_sync_status_json
)
223 global_sync_status
=sync_status
['sync_status']['info']['status']
224 num_shards
=sync_status
['sync_status']['info']['num_shards']
226 sync_markers
=sync_status
['sync_status']['markers']
227 log
.debug('sync_markers=%s', sync_markers
)
228 assert(num_shards
== len(sync_markers
))
231 for i
in range(num_shards
):
232 markers
[i
] = sync_markers
[i
]['val']['marker']
234 return (num_shards
, markers
)
236 def data_sync_status(target_zone
, source_zone
):
237 if target_zone
== source_zone
:
240 for _
in range(config
.checkpoint_retries
):
241 cmd
= ['data', 'sync', 'status'] + target_zone
.zone_args()
242 cmd
+= ['--source-zone', source_zone
.name
]
243 data_sync_status_json
, retcode
= target_zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
245 return parse_data_sync_status(data_sync_status_json
)
247 assert(retcode
== 2) # ENOENT
248 time
.sleep(config
.checkpoint_delay
)
250 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
251 (target_zone
.name
, source_zone
.name
)
253 def bucket_sync_status(target_zone
, source_zone
, bucket_name
):
254 if target_zone
== source_zone
:
257 cmd
= ['bucket', 'sync', 'markers'] + target_zone
.zone_args()
258 cmd
+= ['--source-zone', source_zone
.name
]
259 cmd
+= ['--bucket', bucket_name
]
260 cmd
+= ['--tenant', config
.tenant
, '--uid', user
.name
] if config
.tenant
else []
262 bucket_sync_status_json
, retcode
= target_zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
266 assert(retcode
== 2) # ENOENT
268 sync_status
= json
.loads(bucket_sync_status_json
)
271 for entry
in sync_status
:
273 pos
= val
['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
274 markers
[entry
['key']] = pos
278 def data_source_log_status(source_zone
):
279 source_cluster
= source_zone
.cluster
280 cmd
= ['datalog', 'status'] + source_zone
.zone_args()
281 datalog_status_json
, retcode
= source_cluster
.admin(cmd
, read_only
=True)
282 datalog_status
= json
.loads(datalog_status_json
)
284 markers
= {i
: s
['marker'] for i
, s
in enumerate(datalog_status
)}
285 log
.debug('data markers for zone=%s markers=%s', source_zone
.name
, markers
)
288 def bucket_source_log_status(source_zone
, bucket_name
):
289 cmd
= ['bilog', 'status'] + source_zone
.zone_args()
290 cmd
+= ['--bucket', bucket_name
]
291 cmd
+= ['--tenant', config
.tenant
, '--uid', user
.name
] if config
.tenant
else []
292 source_cluster
= source_zone
.cluster
293 bilog_status_json
, retcode
= source_cluster
.admin(cmd
, read_only
=True)
294 bilog_status
= json
.loads(bilog_status_json
)
299 m
= bilog_status
['markers']
308 log
.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone
.name
, bucket_name
, markers
)
311 def compare_data_status(target_zone
, source_zone
, log_status
, sync_status
):
312 if len(log_status
) != len(sync_status
):
313 log
.error('len(log_status)=%d len(sync_status)=%d', len(log_status
), len(sync_status
))
317 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
321 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
324 log
.warning('data of zone %s behind zone %s: %s', target_zone
.name
, source_zone
.name
, msg
)
329 def compare_bucket_status(target_zone
, source_zone
, bucket_name
, log_status
, sync_status
):
330 if len(log_status
) != len(sync_status
):
331 log
.error('len(log_status)=%d len(sync_status)=%d', len(log_status
), len(sync_status
))
335 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
339 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
342 log
.warning('bucket %s zone %s behind zone %s: %s', bucket_name
, target_zone
.name
, source_zone
.name
, msg
)
347 def zone_data_checkpoint(target_zone
, source_zone
):
348 if not target_zone
.syncs_from(source_zone
.name
):
351 log_status
= data_source_log_status(source_zone
)
352 log
.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone
.name
, source_zone
.name
)
354 for _
in range(config
.checkpoint_retries
):
355 num_shards
, sync_status
= data_sync_status(target_zone
, source_zone
)
357 log
.debug('log_status=%s', log_status
)
358 log
.debug('sync_status=%s', sync_status
)
360 if compare_data_status(target_zone
, source_zone
, log_status
, sync_status
):
361 log
.info('finished data checkpoint for target_zone=%s source_zone=%s',
362 target_zone
.name
, source_zone
.name
)
364 time
.sleep(config
.checkpoint_delay
)
366 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
367 (target_zone
.name
, source_zone
.name
)
369 def zonegroup_data_checkpoint(zonegroup_conns
):
370 for source_conn
in zonegroup_conns
.rw_zones
:
371 for target_conn
in zonegroup_conns
.zones
:
372 if source_conn
.zone
== target_conn
.zone
:
374 log
.debug('data checkpoint: source=%s target=%s', source_conn
.zone
.name
, target_conn
.zone
.name
)
375 zone_data_checkpoint(target_conn
.zone
, source_conn
.zone
)
377 def zone_bucket_checkpoint(target_zone
, source_zone
, bucket_name
):
378 if not target_zone
.syncs_from(source_zone
.name
):
381 cmd
= ['bucket', 'sync', 'checkpoint']
382 cmd
+= ['--bucket', bucket_name
, '--source-zone', source_zone
.name
]
383 retry_delay_ms
= config
.checkpoint_delay
* 1000
384 timeout_sec
= config
.checkpoint_retries
* config
.checkpoint_delay
385 cmd
+= ['--retry-delay-ms', str(retry_delay_ms
), '--timeout-sec', str(timeout_sec
)]
386 cmd
+= target_zone
.zone_args()
387 target_zone
.cluster
.admin(cmd
, debug_rgw
=1)
389 def zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
):
390 for source_conn
in zonegroup_conns
.rw_zones
:
391 for target_conn
in zonegroup_conns
.zones
:
392 if source_conn
.zone
== target_conn
.zone
:
394 log
.debug('bucket checkpoint: source=%s target=%s bucket=%s', source_conn
.zone
.name
, target_conn
.zone
.name
, bucket_name
)
395 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket_name
)
396 for source_conn
, target_conn
in combinations(zonegroup_conns
.zones
, 2):
397 if target_conn
.zone
.has_buckets():
398 target_conn
.check_bucket_eq(source_conn
, bucket_name
)
400 def set_master_zone(zone
):
401 zone
.modify(zone
.cluster
, ['--master'])
402 zonegroup
= zone
.zonegroup
403 zonegroup
.period
.update(zone
, commit
=True)
404 zonegroup
.master_zone
= zone
405 log
.info('Set master zone=%s, waiting %ds for reconfiguration..', zone
.name
, config
.reconfigure_delay
)
406 time
.sleep(config
.reconfigure_delay
)
408 def set_sync_from_all(zone
, flag
):
409 s
= 'true' if flag
else 'false'
410 zone
.modify(zone
.cluster
, ['--sync-from-all={}'.format(s
)])
411 zonegroup
= zone
.zonegroup
412 zonegroup
.period
.update(zone
, commit
=True)
413 log
.info('Set sync_from_all flag on zone %s to %s', zone
.name
, s
)
414 time
.sleep(config
.reconfigure_delay
)
416 def set_redirect_zone(zone
, redirect_zone
):
417 id_str
= redirect_zone
.id if redirect_zone
else ''
418 zone
.modify(zone
.cluster
, ['--redirect-zone={}'.format(id_str
)])
419 zonegroup
= zone
.zonegroup
420 zonegroup
.period
.update(zone
, commit
=True)
421 log
.info('Set redirect_zone zone %s to "%s"', zone
.name
, id_str
)
422 time
.sleep(config
.reconfigure_delay
)
424 def enable_bucket_sync(zone
, bucket_name
):
425 cmd
= ['bucket', 'sync', 'enable', '--bucket', bucket_name
] + zone
.zone_args()
426 zone
.cluster
.admin(cmd
)
428 def disable_bucket_sync(zone
, bucket_name
):
429 cmd
= ['bucket', 'sync', 'disable', '--bucket', bucket_name
] + zone
.zone_args()
430 zone
.cluster
.admin(cmd
)
432 def check_buckets_sync_status_obj_not_exist(zone
, buckets
):
433 for _
in range(config
.checkpoint_retries
):
434 cmd
= ['log', 'list'] + zone
.zone_arg()
435 log_list
, ret
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
436 for bucket
in buckets
:
437 if log_list
.find(':'+bucket
+":") >= 0:
441 time
.sleep(config
.checkpoint_delay
)
444 def gen_bucket_name():
448 return run_prefix
+ '-' + str(num_buckets
)
454 return "roles" + '-' + run_prefix
+ '-' + str(num_roles
)
456 class ZonegroupConns
:
457 def __init__(self
, zonegroup
):
458 self
.zonegroup
= zonegroup
462 self
.master_zone
= None
464 for z
in zonegroup
.zones
:
465 zone_conn
= z
.get_conn(user
.credentials
)
466 self
.zones
.append(zone_conn
)
468 self
.ro_zones
.append(zone_conn
)
470 self
.rw_zones
.append(zone_conn
)
472 if z
== zonegroup
.master_zone
:
473 self
.master_zone
= zone_conn
475 def check_all_buckets_exist(zone_conn
, buckets
):
476 if not zone_conn
.zone
.has_buckets():
481 zone_conn
.get_bucket(b
)
483 log
.critical('zone %s does not contain bucket %s', zone_conn
.zone
.name
, b
)
488 def check_all_buckets_dont_exist(zone_conn
, buckets
):
489 if not zone_conn
.zone
.has_buckets():
494 zone_conn
.get_bucket(b
)
498 log
.critical('zone %s contains bucket %s', zone
.zone
, b
)
503 def create_role_per_zone(zonegroup_conns
, roles_per_zone
= 1):
506 for zone
in zonegroup_conns
.rw_zones
:
507 for i
in range(roles_per_zone
):
508 role_name
= gen_role_name()
509 log
.info('create role zone=%s name=%s', zone
.name
, role_name
)
510 policy_document
= "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/testuser\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
511 role
= zone
.create_role("", role_name
, policy_document
, "")
512 roles
.append(role_name
)
513 zone_role
.append((zone
, role
))
515 return roles
, zone_role
517 def create_bucket_per_zone(zonegroup_conns
, buckets_per_zone
= 1):
520 for zone
in zonegroup_conns
.rw_zones
:
521 for i
in range(buckets_per_zone
):
522 bucket_name
= gen_bucket_name()
523 log
.info('create bucket zone=%s name=%s', zone
.name
, bucket_name
)
524 bucket
= zone
.create_bucket(bucket_name
)
525 buckets
.append(bucket_name
)
526 zone_bucket
.append((zone
, bucket
))
528 return buckets
, zone_bucket
530 def create_bucket_per_zone_in_realm():
533 for zonegroup
in realm
.current_period
.zonegroups
:
534 zg_conn
= ZonegroupConns(zonegroup
)
535 b
, z
= create_bucket_per_zone(zg_conn
)
537 zone_bucket
.extend(z
)
538 return buckets
, zone_bucket
540 def test_bucket_create():
541 zonegroup
= realm
.master_zonegroup()
542 zonegroup_conns
= ZonegroupConns(zonegroup
)
543 buckets
, _
= create_bucket_per_zone(zonegroup_conns
)
544 zonegroup_meta_checkpoint(zonegroup
)
546 for zone
in zonegroup_conns
.zones
:
547 assert check_all_buckets_exist(zone
, buckets
)
549 def test_bucket_recreate():
550 zonegroup
= realm
.master_zonegroup()
551 zonegroup_conns
= ZonegroupConns(zonegroup
)
552 buckets
, _
= create_bucket_per_zone(zonegroup_conns
)
553 zonegroup_meta_checkpoint(zonegroup
)
556 for zone
in zonegroup_conns
.zones
:
557 assert check_all_buckets_exist(zone
, buckets
)
559 # recreate buckets on all zones, make sure they weren't removed
560 for zone
in zonegroup_conns
.rw_zones
:
561 for bucket_name
in buckets
:
562 bucket
= zone
.create_bucket(bucket_name
)
564 for zone
in zonegroup_conns
.zones
:
565 assert check_all_buckets_exist(zone
, buckets
)
567 zonegroup_meta_checkpoint(zonegroup
)
569 for zone
in zonegroup_conns
.zones
:
570 assert check_all_buckets_exist(zone
, buckets
)
572 def test_bucket_remove():
573 zonegroup
= realm
.master_zonegroup()
574 zonegroup_conns
= ZonegroupConns(zonegroup
)
575 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
576 zonegroup_meta_checkpoint(zonegroup
)
578 for zone
in zonegroup_conns
.zones
:
579 assert check_all_buckets_exist(zone
, buckets
)
581 for zone
, bucket_name
in zone_bucket
:
582 zone
.conn
.delete_bucket(bucket_name
)
584 zonegroup_meta_checkpoint(zonegroup
)
586 for zone
in zonegroup_conns
.zones
:
587 assert check_all_buckets_dont_exist(zone
, buckets
)
589 def get_bucket(zone
, bucket_name
):
590 return zone
.conn
.get_bucket(bucket_name
)
592 def get_key(zone
, bucket_name
, obj_name
):
593 b
= get_bucket(zone
, bucket_name
)
594 return b
.get_key(obj_name
)
596 def new_key(zone
, bucket_name
, obj_name
):
597 b
= get_bucket(zone
, bucket_name
)
598 return b
.new_key(obj_name
)
600 def check_bucket_eq(zone_conn1
, zone_conn2
, bucket
):
601 if zone_conn2
.zone
.has_buckets():
602 zone_conn2
.check_bucket_eq(zone_conn1
, bucket
.name
)
604 def check_role_eq(zone_conn1
, zone_conn2
, role
):
605 if zone_conn2
.zone
.has_roles():
606 zone_conn2
.check_role_eq(zone_conn1
, role
['create_role_response']['create_role_result']['role']['role_name'])
608 def test_object_sync():
609 zonegroup
= realm
.master_zonegroup()
610 zonegroup_conns
= ZonegroupConns(zonegroup
)
611 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
613 objnames
= [ 'myobj', '_myobj', ':', '&' ]
616 # don't wait for meta sync just yet
617 for zone
, bucket_name
in zone_bucket
:
618 for objname
in objnames
:
619 k
= new_key(zone
, bucket_name
, objname
)
620 k
.set_contents_from_string(content
)
622 zonegroup_meta_checkpoint(zonegroup
)
624 for source_conn
, bucket
in zone_bucket
:
625 for target_conn
in zonegroup_conns
.zones
:
626 if source_conn
.zone
== target_conn
.zone
:
629 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
630 check_bucket_eq(source_conn
, target_conn
, bucket
)
632 def test_object_delete():
633 zonegroup
= realm
.master_zonegroup()
634 zonegroup_conns
= ZonegroupConns(zonegroup
)
635 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
640 # don't wait for meta sync just yet
641 for zone
, bucket
in zone_bucket
:
642 k
= new_key(zone
, bucket
, objname
)
643 k
.set_contents_from_string(content
)
645 zonegroup_meta_checkpoint(zonegroup
)
647 # check object exists
648 for source_conn
, bucket
in zone_bucket
:
649 for target_conn
in zonegroup_conns
.zones
:
650 if source_conn
.zone
== target_conn
.zone
:
653 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
654 check_bucket_eq(source_conn
, target_conn
, bucket
)
656 # check object removal
657 for source_conn
, bucket
in zone_bucket
:
658 k
= get_key(source_conn
, bucket
, objname
)
660 for target_conn
in zonegroup_conns
.zones
:
661 if source_conn
.zone
== target_conn
.zone
:
664 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
665 check_bucket_eq(source_conn
, target_conn
, bucket
)
667 def get_latest_object_version(key
):
668 for k
in key
.bucket
.list_versions(key
.name
):
673 def test_versioned_object_incremental_sync():
674 zonegroup
= realm
.master_zonegroup()
675 zonegroup_conns
= ZonegroupConns(zonegroup
)
676 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
679 for _
, bucket
in zone_bucket
:
680 bucket
.configure_versioning(True)
682 zonegroup_meta_checkpoint(zonegroup
)
684 # upload a dummy object to each bucket and wait for sync. this forces each
685 # bucket to finish a full sync and switch to incremental
686 for source_conn
, bucket
in zone_bucket
:
687 new_key(source_conn
, bucket
, 'dummy').set_contents_from_string('')
688 for target_conn
in zonegroup_conns
.zones
:
689 if source_conn
.zone
== target_conn
.zone
:
691 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
693 for _
, bucket
in zone_bucket
:
694 # create and delete multiple versions of an object from each zone
695 for zone_conn
in zonegroup_conns
.rw_zones
:
696 obj
= 'obj-' + zone_conn
.name
697 k
= new_key(zone_conn
, bucket
, obj
)
699 k
.set_contents_from_string('version1')
700 log
.debug('version1 id=%s', k
.version_id
)
701 # don't delete version1 - this tests that the initial version
702 # doesn't get squashed into later versions
704 # create and delete the following object versions to test that
705 # the operations don't race with each other during sync
706 k
.set_contents_from_string('version2')
707 log
.debug('version2 id=%s', k
.version_id
)
708 k
.bucket
.delete_key(obj
, version_id
=k
.version_id
)
710 k
.set_contents_from_string('version3')
711 log
.debug('version3 id=%s', k
.version_id
)
712 k
.bucket
.delete_key(obj
, version_id
=k
.version_id
)
714 for _
, bucket
in zone_bucket
:
715 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
717 for _
, bucket
in zone_bucket
:
718 # overwrite the acls to test that metadata-only entries are applied
719 for zone_conn
in zonegroup_conns
.rw_zones
:
720 obj
= 'obj-' + zone_conn
.name
721 k
= new_key(zone_conn
, bucket
.name
, obj
)
722 v
= get_latest_object_version(k
)
725 for _
, bucket
in zone_bucket
:
726 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
728 def test_concurrent_versioned_object_incremental_sync():
729 zonegroup
= realm
.master_zonegroup()
730 zonegroup_conns
= ZonegroupConns(zonegroup
)
731 zone
= zonegroup_conns
.rw_zones
[0]
733 # create a versioned bucket
734 bucket
= zone
.create_bucket(gen_bucket_name())
735 log
.debug('created bucket=%s', bucket
.name
)
736 bucket
.configure_versioning(True)
738 zonegroup_meta_checkpoint(zonegroup
)
740 # upload a dummy object and wait for sync. this forces each zone to finish
741 # a full sync and switch to incremental
742 new_key(zone
, bucket
, 'dummy').set_contents_from_string('')
743 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
745 # create several concurrent versions on each zone and let them race to sync
748 for zone_conn
in zonegroup_conns
.rw_zones
:
749 k
= new_key(zone_conn
, bucket
, obj
)
750 k
.set_contents_from_string('version1')
751 log
.debug('zone=%s version=%s', zone_conn
.zone
.name
, k
.version_id
)
753 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
754 zonegroup_data_checkpoint(zonegroup_conns
)
756 def test_version_suspended_incremental_sync():
757 zonegroup
= realm
.master_zonegroup()
758 zonegroup_conns
= ZonegroupConns(zonegroup
)
760 zone
= zonegroup_conns
.rw_zones
[0]
762 # create a non-versioned bucket
763 bucket
= zone
.create_bucket(gen_bucket_name())
764 log
.debug('created bucket=%s', bucket
.name
)
765 zonegroup_meta_checkpoint(zonegroup
)
767 # upload an initial object
768 key1
= new_key(zone
, bucket
, 'obj')
769 key1
.set_contents_from_string('')
770 log
.debug('created initial version id=%s', key1
.version_id
)
771 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
774 bucket
.configure_versioning(True)
775 zonegroup_meta_checkpoint(zonegroup
)
777 # re-upload the object as a new version
778 key2
= new_key(zone
, bucket
, 'obj')
779 key2
.set_contents_from_string('')
780 log
.debug('created new version id=%s', key2
.version_id
)
781 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
784 bucket
.configure_versioning(False)
785 zonegroup_meta_checkpoint(zonegroup
)
787 # re-upload the object as a 'null' version
788 key3
= new_key(zone
, bucket
, 'obj')
789 key3
.set_contents_from_string('')
790 log
.debug('created null version id=%s', key3
.version_id
)
791 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
793 def test_delete_marker_full_sync():
794 zonegroup
= realm
.master_zonegroup()
795 zonegroup_conns
= ZonegroupConns(zonegroup
)
796 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
799 for _
, bucket
in zone_bucket
:
800 bucket
.configure_versioning(True)
801 zonegroup_meta_checkpoint(zonegroup
)
803 for zone
, bucket
in zone_bucket
:
804 # upload an initial object
805 key1
= new_key(zone
, bucket
, 'obj')
806 key1
.set_contents_from_string('')
808 # create a delete marker
809 key2
= new_key(zone
, bucket
, 'obj')
813 for _
, bucket
in zone_bucket
:
814 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
816 def test_suspended_delete_marker_full_sync():
817 zonegroup
= realm
.master_zonegroup()
818 zonegroup_conns
= ZonegroupConns(zonegroup
)
819 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
821 # enable/suspend versioning
822 for _
, bucket
in zone_bucket
:
823 bucket
.configure_versioning(True)
824 bucket
.configure_versioning(False)
825 zonegroup_meta_checkpoint(zonegroup
)
827 for zone
, bucket
in zone_bucket
:
828 # upload an initial object
829 key1
= new_key(zone
, bucket
, 'obj')
830 key1
.set_contents_from_string('')
832 # create a delete marker
833 key2
= new_key(zone
, bucket
, 'obj')
837 for _
, bucket
in zone_bucket
:
838 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
840 def test_bucket_versioning():
841 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
842 for _
, bucket
in zone_bucket
:
843 bucket
.configure_versioning(True)
844 res
= bucket
.get_versioning_status()
846 assert(key
in res
and res
[key
] == 'Enabled')
848 def test_bucket_acl():
849 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
850 for _
, bucket
in zone_bucket
:
851 assert(len(bucket
.get_acl().acl
.grants
) == 1) # single grant on owner
852 bucket
.set_acl('public-read')
853 assert(len(bucket
.get_acl().acl
.grants
) == 2) # new grant on AllUsers
855 def test_bucket_cors():
856 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
857 for _
, bucket
in zone_bucket
:
858 cors_cfg
= CORSConfiguration()
859 cors_cfg
.add_rule(['DELETE'], 'https://www.example.com', allowed_header
='*', max_age_seconds
=3000)
860 bucket
.set_cors(cors_cfg
)
861 assert(bucket
.get_cors().to_xml() == cors_cfg
.to_xml())
863 def test_bucket_delete_notempty():
864 zonegroup
= realm
.master_zonegroup()
865 zonegroup_conns
= ZonegroupConns(zonegroup
)
866 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
867 zonegroup_meta_checkpoint(zonegroup
)
869 for zone_conn
, bucket_name
in zone_bucket
:
870 # upload an object to each bucket on its own zone
871 conn
= zone_conn
.get_connection()
872 bucket
= conn
.get_bucket(bucket_name
)
873 k
= bucket
.new_key('foo')
874 k
.set_contents_from_string('bar')
875 # attempt to delete the bucket before this object can sync
877 conn
.delete_bucket(bucket_name
)
878 except boto
.exception
.S3ResponseError
as e
:
879 assert(e
.error_code
== 'BucketNotEmpty')
881 assert False # expected 409 BucketNotEmpty
883 # assert that each bucket still exists on the master
884 c1
= zonegroup_conns
.master_zone
.conn
885 for _
, bucket_name
in zone_bucket
:
886 assert c1
.get_bucket(bucket_name
)
888 def test_multi_period_incremental_sync():
889 zonegroup
= realm
.master_zonegroup()
890 if len(zonegroup
.zones
) < 3:
891 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
893 # periods to include in mdlog comparison
894 mdlog_periods
= [realm
.current_period
.id]
896 # create a bucket in each zone
897 zonegroup_conns
= ZonegroupConns(zonegroup
)
898 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
900 zonegroup_meta_checkpoint(zonegroup
)
902 z1
, z2
, z3
= zonegroup
.zones
[0:3]
903 assert(z1
== zonegroup
.master_zone
)
905 # kill zone 3 gateways to freeze sync status to incremental in first period
908 # change master to zone 2 -> period 2
910 mdlog_periods
+= [realm
.current_period
.id]
912 for zone_conn
, _
in zone_bucket
:
913 if zone_conn
.zone
== z3
:
915 bucket_name
= gen_bucket_name()
916 log
.info('create bucket zone=%s name=%s', zone_conn
.name
, bucket_name
)
917 bucket
= zone_conn
.conn
.create_bucket(bucket_name
)
918 buckets
.append(bucket_name
)
920 # wait for zone 1 to sync
921 zone_meta_checkpoint(z1
)
923 # change master back to zone 1 -> period 3
925 mdlog_periods
+= [realm
.current_period
.id]
927 for zone_conn
, bucket_name
in zone_bucket
:
928 if zone_conn
.zone
== z3
:
930 bucket_name
= gen_bucket_name()
931 log
.info('create bucket zone=%s name=%s', zone_conn
.name
, bucket_name
)
932 zone_conn
.conn
.create_bucket(bucket_name
)
933 buckets
.append(bucket_name
)
935 # restart zone 3 gateway and wait for sync
937 zonegroup_meta_checkpoint(zonegroup
)
939 # verify that we end up with the same objects
940 for bucket_name
in buckets
:
941 for source_conn
, _
in zone_bucket
:
942 for target_conn
in zonegroup_conns
.zones
:
943 if source_conn
.zone
== target_conn
.zone
:
946 if target_conn
.zone
.has_buckets():
947 target_conn
.check_bucket_eq(source_conn
, bucket_name
)
949 # verify that mdlogs are not empty and match for each period
950 for period
in mdlog_periods
:
951 master_mdlog
= mdlog_list(z1
, period
)
952 assert len(master_mdlog
) > 0
953 for zone
in zonegroup
.zones
:
956 mdlog
= mdlog_list(zone
, period
)
957 assert len(mdlog
) == len(master_mdlog
)
959 # autotrim mdlogs for master zone
962 # autotrim mdlogs for peers
963 for zone
in zonegroup
.zones
:
968 # verify that mdlogs are empty for each period
969 for period
in mdlog_periods
:
970 for zone
in zonegroup
.zones
:
971 mdlog
= mdlog_list(zone
, period
)
972 assert len(mdlog
) == 0
974 def test_datalog_autotrim():
975 zonegroup
= realm
.master_zonegroup()
976 zonegroup_conns
= ZonegroupConns(zonegroup
)
977 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
979 # upload an object to each zone to generate a datalog entry
980 for zone
, bucket
in zone_bucket
:
981 k
= new_key(zone
, bucket
.name
, 'key')
982 k
.set_contents_from_string('body')
984 # wait for metadata and data sync to catch up
985 zonegroup_meta_checkpoint(zonegroup
)
986 zonegroup_data_checkpoint(zonegroup_conns
)
989 for zone
, _
in zone_bucket
:
990 # read max markers for each shard
991 status
= datalog_status(zone
.zone
)
993 datalog_autotrim(zone
.zone
)
995 for shard_id
, shard_status
in enumerate(status
):
997 before_trim
= dateutil
.parser
.isoparse(shard_status
['last_update'])
998 except: # empty timestamps look like "0.000000" and will fail here
1000 entries
= datalog_list(zone
.zone
, ['--shard-id', str(shard_id
), '--max-entries', '1'])
1001 if not len(entries
):
1003 after_trim
= dateutil
.parser
.isoparse(entries
[0]['timestamp'])
1004 assert before_trim
< after_trim
, "any datalog entries must be newer than trim"
1006 def test_multi_zone_redirect():
1007 zonegroup
= realm
.master_zonegroup()
1008 if len(zonegroup
.rw_zones
) < 2:
1009 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
1011 zonegroup_conns
= ZonegroupConns(zonegroup
)
1012 (zc1
, zc2
) = zonegroup_conns
.rw_zones
[0:2]
1014 z1
, z2
= (zc1
.zone
, zc2
.zone
)
1016 set_sync_from_all(z2
, False)
1018 # create a bucket on the first zone
1019 bucket_name
= gen_bucket_name()
1020 log
.info('create bucket zone=%s name=%s', z1
.name
, bucket_name
)
1021 bucket
= zc1
.conn
.create_bucket(bucket_name
)
1022 obj
= 'testredirect'
1024 key
= bucket
.new_key(obj
)
1026 key
.set_contents_from_string(data
)
1028 zonegroup_meta_checkpoint(zonegroup
)
1030 # try to read object from second zone (should fail)
1031 bucket2
= get_bucket(zc2
, bucket_name
)
1032 assert_raises(boto
.exception
.S3ResponseError
, bucket2
.get_key
, obj
)
1034 set_redirect_zone(z2
, z1
)
1036 key2
= bucket2
.get_key(obj
)
1038 eq(data
, key2
.get_contents_as_string(encoding
='ascii'))
1040 key
= bucket
.new_key(obj
)
1042 for x
in ['a', 'b', 'c', 'd']:
1044 key
.set_contents_from_string(data
)
1045 eq(data
, key2
.get_contents_as_string(encoding
='ascii'))
1047 # revert config changes
1048 set_sync_from_all(z2
, True)
1049 set_redirect_zone(z2
, None)
1051 def test_zonegroup_remove():
1052 zonegroup
= realm
.master_zonegroup()
1053 zonegroup_conns
= ZonegroupConns(zonegroup
)
1054 if len(zonegroup
.zones
) < 2:
1055 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
1057 zonegroup_meta_checkpoint(zonegroup
)
1058 z1
, z2
= zonegroup
.zones
[0:2]
1059 c1
, c2
= (z1
.cluster
, z2
.cluster
)
1061 # get admin credentials out of existing zone
1062 system_key
= z1
.data
['system_key']
1063 admin_creds
= Credentials(system_key
['access_key'], system_key
['secret_key'])
1065 # create a new zone in zonegroup on c2 and commit
1066 zone
= Zone('remove', zonegroup
, c2
)
1067 zone
.create(c2
, admin_creds
.credential_args())
1068 zonegroup
.zones
.append(zone
)
1069 zonegroup
.period
.update(zone
, commit
=True)
1071 zonegroup
.remove(c1
, zone
)
1073 # another 'zonegroup remove' should fail with ENOENT
1074 _
, retcode
= zonegroup
.remove(c1
, zone
, check_retcode
=False)
1075 assert(retcode
== 2) # ENOENT
1077 # delete the new zone
1080 # validate the resulting period
1081 zonegroup
.period
.update(z1
, commit
=True)
1084 def test_zg_master_zone_delete():
1086 master_zg
= realm
.master_zonegroup()
1087 master_zone
= master_zg
.master_zone
1089 assert(len(master_zg
.zones
) >= 1)
1090 master_cluster
= master_zg
.zones
[0].cluster
1092 rm_zg
= ZoneGroup('remove_zg')
1093 rm_zg
.create(master_cluster
)
1095 rm_zone
= Zone('remove', rm_zg
, master_cluster
)
1096 rm_zone
.create(master_cluster
)
1097 master_zg
.period
.update(master_zone
, commit
=True)
1100 rm_zone
.delete(master_cluster
)
1101 # Period update: This should now fail as the zone will be the master zone
1103 _
, retcode
= master_zg
.period
.update(master_zone
, check_retcode
=False)
1104 assert(retcode
== errno
.EINVAL
)
1106 # Proceed to delete the zonegroup as well, previous period now does not
1107 # contain a dangling master_zone, this must succeed
1108 rm_zg
.delete(master_cluster
)
1109 master_zg
.period
.update(master_zone
, commit
=True)
1111 def test_set_bucket_website():
1112 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
1113 for _
, bucket
in zone_bucket
:
1114 website_cfg
= WebsiteConfiguration(suffix
='index.html',error_key
='error.html')
1116 bucket
.set_website_configuration(website_cfg
)
1117 except boto
.exception
.S3ResponseError
as e
:
1118 if e
.error_code
== 'MethodNotAllowed':
1119 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
1120 assert(bucket
.get_website_configuration_with_xml()[1] == website_cfg
.to_xml())
1122 def test_set_bucket_policy():
1124 "Version": "2012-10-17",
1130 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
1131 for _
, bucket
in zone_bucket
:
1132 bucket
.set_policy(policy
)
1133 assert(bucket
.get_policy().decode('ascii') == policy
)
1135 @attr('bucket_sync_disable')
1136 def test_bucket_sync_disable():
1137 zonegroup
= realm
.master_zonegroup()
1138 zonegroup_conns
= ZonegroupConns(zonegroup
)
1139 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
1140 zonegroup_meta_checkpoint(zonegroup
)
1142 for bucket_name
in buckets
:
1143 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
1145 for zone
in zonegroup
.zones
:
1146 check_buckets_sync_status_obj_not_exist(zone
, buckets
)
1148 zonegroup_data_checkpoint(zonegroup_conns
)
1150 @attr('bucket_sync_disable')
1151 def test_bucket_sync_enable_right_after_disable():
1152 zonegroup
= realm
.master_zonegroup()
1153 zonegroup_conns
= ZonegroupConns(zonegroup
)
1154 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
1156 objnames
= ['obj1', 'obj2', 'obj3', 'obj4']
1159 for zone
, bucket
in zone_bucket
:
1160 for objname
in objnames
:
1161 k
= new_key(zone
, bucket
.name
, objname
)
1162 k
.set_contents_from_string(content
)
1164 zonegroup_meta_checkpoint(zonegroup
)
1166 for bucket_name
in buckets
:
1167 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
1169 for bucket_name
in buckets
:
1170 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
1171 enable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
1173 objnames_2
= ['obj5', 'obj6', 'obj7', 'obj8']
1175 for zone
, bucket
in zone_bucket
:
1176 for objname
in objnames_2
:
1177 k
= new_key(zone
, bucket
.name
, objname
)
1178 k
.set_contents_from_string(content
)
1180 for bucket_name
in buckets
:
1181 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
1183 zonegroup_data_checkpoint(zonegroup_conns
)
1185 @attr('bucket_sync_disable')
1186 def test_bucket_sync_disable_enable():
1187 zonegroup
= realm
.master_zonegroup()
1188 zonegroup_conns
= ZonegroupConns(zonegroup
)
1189 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
1191 objnames
= [ 'obj1', 'obj2', 'obj3', 'obj4' ]
1194 for zone
, bucket
in zone_bucket
:
1195 for objname
in objnames
:
1196 k
= new_key(zone
, bucket
.name
, objname
)
1197 k
.set_contents_from_string(content
)
1199 zonegroup_meta_checkpoint(zonegroup
)
1201 for bucket_name
in buckets
:
1202 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
1204 for bucket_name
in buckets
:
1205 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
1207 zonegroup_meta_checkpoint(zonegroup
)
1209 objnames_2
= [ 'obj5', 'obj6', 'obj7', 'obj8' ]
1211 for zone
, bucket
in zone_bucket
:
1212 for objname
in objnames_2
:
1213 k
= new_key(zone
, bucket
.name
, objname
)
1214 k
.set_contents_from_string(content
)
1216 for bucket_name
in buckets
:
1217 enable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
1219 for bucket_name
in buckets
:
1220 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
1222 zonegroup_data_checkpoint(zonegroup_conns
)
1224 def test_multipart_object_sync():
1225 zonegroup
= realm
.master_zonegroup()
1226 zonegroup_conns
= ZonegroupConns(zonegroup
)
1227 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
1229 _
, bucket
= zone_bucket
[0]
1231 # initiate a multipart upload
1232 upload
= bucket
.initiate_multipart_upload('MULTIPART')
1233 mp
= boto
.s3
.multipart
.MultiPartUpload(bucket
)
1234 mp
.key_name
= upload
.key_name
1236 part_size
= 5 * 1024 * 1024 # 5M min part size
1237 mp
.upload_part_from_file(StringIO('a' * part_size
), 1)
1238 mp
.upload_part_from_file(StringIO('b' * part_size
), 2)
1239 mp
.upload_part_from_file(StringIO('c' * part_size
), 3)
1240 mp
.upload_part_from_file(StringIO('d' * part_size
), 4)
1241 mp
.complete_upload()
1243 zonegroup_meta_checkpoint(zonegroup
)
1244 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1246 def test_encrypted_object_sync():
1247 zonegroup
= realm
.master_zonegroup()
1248 zonegroup_conns
= ZonegroupConns(zonegroup
)
1250 if len(zonegroup
.rw_zones
) < 2:
1251 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
1253 (zone1
, zone2
) = zonegroup_conns
.rw_zones
[0:2]
1255 # create a bucket on the first zone
1256 bucket_name
= gen_bucket_name()
1257 log
.info('create bucket zone=%s name=%s', zone1
.name
, bucket_name
)
1258 bucket
= zone1
.conn
.create_bucket(bucket_name
)
1260 # upload an object with sse-c encryption
1262 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
1263 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
1264 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
1266 key
= bucket
.new_key('testobj-sse-c')
1268 key
.set_contents_from_string(data
, headers
=sse_c_headers
)
1270 # upload an object with sse-kms encryption
1272 'x-amz-server-side-encryption': 'aws:kms',
1273 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
1274 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
1276 key
= bucket
.new_key('testobj-sse-kms')
1277 key
.set_contents_from_string(data
, headers
=sse_kms_headers
)
1279 # wait for the bucket metadata and data to sync
1280 zonegroup_meta_checkpoint(zonegroup
)
1281 zone_bucket_checkpoint(zone2
.zone
, zone1
.zone
, bucket_name
)
1283 # read the encrypted objects from the second zone
1284 bucket2
= get_bucket(zone2
, bucket_name
)
1285 key
= bucket2
.get_key('testobj-sse-c', headers
=sse_c_headers
)
1286 eq(data
, key
.get_contents_as_string(headers
=sse_c_headers
, encoding
='ascii'))
1288 key
= bucket2
.get_key('testobj-sse-kms')
1289 eq(data
, key
.get_contents_as_string(encoding
='ascii'))
1291 def test_bucket_index_log_trim():
1292 zonegroup
= realm
.master_zonegroup()
1293 zonegroup_conns
= ZonegroupConns(zonegroup
)
1295 zone
= zonegroup_conns
.rw_zones
[0]
1297 # create a test bucket, upload some objects, and wait for sync
1298 def make_test_bucket():
1299 name
= gen_bucket_name()
1300 log
.info('create bucket zone=%s name=%s', zone
.name
, name
)
1301 bucket
= zone
.conn
.create_bucket(name
)
1302 for objname
in ('a', 'b', 'c', 'd'):
1303 k
= new_key(zone
, name
, objname
)
1304 k
.set_contents_from_string('foo')
1305 zonegroup_meta_checkpoint(zonegroup
)
1306 zonegroup_bucket_checkpoint(zonegroup_conns
, name
)
1309 # create a 'cold' bucket
1310 cold_bucket
= make_test_bucket()
1312 # trim with max-buckets=0 to clear counters for cold bucket. this should
1313 # prevent it from being considered 'active' by the next autotrim
1314 bilog_autotrim(zone
.zone
, [
1315 '--rgw-sync-log-trim-max-buckets', '0',
1318 # create an 'active' bucket
1319 active_bucket
= make_test_bucket()
1321 # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
1322 bilog_autotrim(zone
.zone
, [
1323 '--rgw-sync-log-trim-max-buckets', '1',
1324 '--rgw-sync-log-trim-min-cold-buckets', '0',
1327 # verify active bucket has empty bilog
1328 active_bilog
= bilog_list(zone
.zone
, active_bucket
.name
)
1329 assert(len(active_bilog
) == 0)
1331 # verify cold bucket has nonempty bilog
1332 cold_bilog
= bilog_list(zone
.zone
, cold_bucket
.name
)
1333 assert(len(cold_bilog
) > 0)
1335 # trim with min-cold-buckets=999 to trim all buckets
1336 bilog_autotrim(zone
.zone
, [
1337 '--rgw-sync-log-trim-max-buckets', '999',
1338 '--rgw-sync-log-trim-min-cold-buckets', '999',
1341 # verify cold bucket has empty bilog
1342 cold_bilog
= bilog_list(zone
.zone
, cold_bucket
.name
)
1343 assert(len(cold_bilog
) == 0)
1345 def test_bucket_reshard_index_log_trim():
1346 zonegroup
= realm
.master_zonegroup()
1347 zonegroup_conns
= ZonegroupConns(zonegroup
)
1349 zone
= zonegroup_conns
.rw_zones
[0]
1351 # create a test bucket, upload some objects, and wait for sync
1352 def make_test_bucket():
1353 name
= gen_bucket_name()
1354 log
.info('create bucket zone=%s name=%s', zone
.name
, name
)
1355 bucket
= zone
.conn
.create_bucket(name
)
1356 for objname
in ('a', 'b', 'c', 'd'):
1357 k
= new_key(zone
, name
, objname
)
1358 k
.set_contents_from_string('foo')
1359 zonegroup_meta_checkpoint(zonegroup
)
1360 zonegroup_bucket_checkpoint(zonegroup_conns
, name
)
1363 # create a 'test' bucket
1364 test_bucket
= make_test_bucket()
1366 # checking bucket layout before resharding
1367 json_obj_1
= bucket_layout(zone
.zone
, test_bucket
.name
)
1368 assert(len(json_obj_1
['layout']['logs']) == 1)
1370 first_gen
= json_obj_1
['layout']['current_index']['gen']
1372 before_reshard_bilog
= bilog_list(zone
.zone
, test_bucket
.name
, ['--gen', str(first_gen
)])
1373 assert(len(before_reshard_bilog
) == 4)
1375 # Resharding the bucket
1376 zone
.zone
.cluster
.admin(['bucket', 'reshard',
1377 '--bucket', test_bucket
.name
,
1378 '--num-shards', '3',
1379 '--yes-i-really-mean-it'])
1381 # checking bucket layout after 1st resharding
1382 json_obj_2
= bucket_layout(zone
.zone
, test_bucket
.name
)
1383 assert(len(json_obj_2
['layout']['logs']) == 2)
1385 second_gen
= json_obj_2
['layout']['current_index']['gen']
1387 after_reshard_bilog
= bilog_list(zone
.zone
, test_bucket
.name
, ['--gen', str(second_gen
)])
1388 assert(len(after_reshard_bilog
) == 0)
1390 # upload more objects
1391 for objname
in ('e', 'f', 'g', 'h'):
1392 k
= new_key(zone
, test_bucket
.name
, objname
)
1393 k
.set_contents_from_string('foo')
1394 zonegroup_bucket_checkpoint(zonegroup_conns
, test_bucket
.name
)
1396 # Resharding the bucket again
1397 zone
.zone
.cluster
.admin(['bucket', 'reshard',
1398 '--bucket', test_bucket
.name
,
1399 '--num-shards', '3',
1400 '--yes-i-really-mean-it'])
1402 # checking bucket layout after 2nd resharding
1403 json_obj_3
= bucket_layout(zone
.zone
, test_bucket
.name
)
1404 assert(len(json_obj_3
['layout']['logs']) == 3)
1406 zonegroup_bucket_checkpoint(zonegroup_conns
, test_bucket
.name
)
1408 bilog_autotrim(zone
.zone
)
1410 # checking bucket layout after 1st bilog autotrim
1411 json_obj_4
= bucket_layout(zone
.zone
, test_bucket
.name
)
1412 assert(len(json_obj_4
['layout']['logs']) == 2)
1414 bilog_autotrim(zone
.zone
)
1416 # checking bucket layout after 2nd bilog autotrim
1417 json_obj_5
= bucket_layout(zone
.zone
, test_bucket
.name
)
1418 assert(len(json_obj_5
['layout']['logs']) == 1)
1420 bilog_autotrim(zone
.zone
)
1422 # upload more objects
1423 for objname
in ('i', 'j', 'k', 'l'):
1424 k
= new_key(zone
, test_bucket
.name
, objname
)
1425 k
.set_contents_from_string('foo')
1426 zonegroup_bucket_checkpoint(zonegroup_conns
, test_bucket
.name
)
1428 # verify the bucket has non-empty bilog
1429 test_bilog
= bilog_list(zone
.zone
, test_bucket
.name
)
1430 assert(len(test_bilog
) > 0)
1432 @attr('bucket_reshard')
1433 def test_bucket_reshard_incremental():
1434 zonegroup
= realm
.master_zonegroup()
1435 zonegroup_conns
= ZonegroupConns(zonegroup
)
1436 zone
= zonegroup_conns
.rw_zones
[0]
1439 bucket
= zone
.create_bucket(gen_bucket_name())
1440 log
.debug('created bucket=%s', bucket
.name
)
1441 zonegroup_meta_checkpoint(zonegroup
)
1443 # upload some objects
1444 for objname
in ('a', 'b', 'c', 'd'):
1445 k
= new_key(zone
, bucket
.name
, objname
)
1446 k
.set_contents_from_string('foo')
1447 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1449 # reshard in each zone
1450 for z
in zonegroup_conns
.rw_zones
:
1451 z
.zone
.cluster
.admin(['bucket', 'reshard',
1452 '--bucket', bucket
.name
,
1453 '--num-shards', '3',
1454 '--yes-i-really-mean-it'])
1456 # upload more objects
1457 for objname
in ('e', 'f', 'g', 'h'):
1458 k
= new_key(zone
, bucket
.name
, objname
)
1459 k
.set_contents_from_string('foo')
1460 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1462 @attr('bucket_reshard')
1463 def test_bucket_reshard_full():
1464 zonegroup
= realm
.master_zonegroup()
1465 zonegroup_conns
= ZonegroupConns(zonegroup
)
1466 zone
= zonegroup_conns
.rw_zones
[0]
1469 bucket
= zone
.create_bucket(gen_bucket_name())
1470 log
.debug('created bucket=%s', bucket
.name
)
1471 zonegroup_meta_checkpoint(zonegroup
)
1473 # stop gateways in other zones so we can force the bucket to full sync
1474 for z
in zonegroup_conns
.rw_zones
[1:]:
1477 # use try-finally to restart gateways even if something fails
1479 # upload some objects
1480 for objname
in ('a', 'b', 'c', 'd'):
1481 k
= new_key(zone
, bucket
.name
, objname
)
1482 k
.set_contents_from_string('foo')
1484 # reshard on first zone
1485 zone
.zone
.cluster
.admin(['bucket', 'reshard',
1486 '--bucket', bucket
.name
,
1487 '--num-shards', '3',
1488 '--yes-i-really-mean-it'])
1490 # upload more objects
1491 for objname
in ('e', 'f', 'g', 'h'):
1492 k
= new_key(zone
, bucket
.name
, objname
)
1493 k
.set_contents_from_string('foo')
1495 for z
in zonegroup_conns
.rw_zones
[1:]:
1498 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1500 def test_bucket_creation_time():
1501 zonegroup
= realm
.master_zonegroup()
1502 zonegroup_conns
= ZonegroupConns(zonegroup
)
1504 zonegroup_meta_checkpoint(zonegroup
)
1506 zone_buckets
= [zone
.get_connection().get_all_buckets() for zone
in zonegroup_conns
.rw_zones
]
1507 for z1
, z2
in combinations(zone_buckets
, 2):
1508 for a
, b
in zip(z1
, z2
):
1510 eq(a
.creation_date
, b
.creation_date
)
1512 def get_bucket_shard_objects(zone
, num_shards
):
1514 Get one object for each shard of the bucket index log
1516 cmd
= ['bucket', 'shard', 'objects'] + zone
.zone_args()
1517 cmd
+= ['--num-shards', str(num_shards
)]
1518 shardobjs_json
, ret
= zone
.cluster
.admin(cmd
, read_only
=True)
1520 shardobjs
= json
.loads(shardobjs_json
)
1521 return shardobjs
['objs']
1523 def write_most_shards(zone
, bucket_name
, num_shards
):
1525 Write one object to most (but not all) bucket index shards.
1527 objs
= get_bucket_shard_objects(zone
.zone
, num_shards
)
1528 random
.shuffle(objs
)
1529 del objs
[-(len(objs
)//10):]
1531 k
= new_key(zone
, bucket_name
, obj
)
1532 k
.set_contents_from_string('foo')
1534 def reshard_bucket(zone
, bucket_name
, num_shards
):
1538 cmd
= ['bucket', 'reshard'] + zone
.zone_args()
1539 cmd
+= ['--bucket', bucket_name
]
1540 cmd
+= ['--num-shards', str(num_shards
)]
1541 cmd
+= ['--yes-i-really-mean-it']
1542 zone
.cluster
.admin(cmd
)
1544 def get_obj_names(zone
, bucket_name
, maxobjs
):
1546 Get names of objects in a bucket.
1548 cmd
= ['bucket', 'list'] + zone
.zone_args()
1549 cmd
+= ['--bucket', bucket_name
]
1550 cmd
+= ['--max-entries', str(maxobjs
)]
1551 objs_json
, _
= zone
.cluster
.admin(cmd
, read_only
=True)
1552 objs
= json
.loads(objs_json
)
1553 return [o
['name'] for o
in objs
]
1555 def bucket_keys_eq(zone1
, zone2
, bucket_name
):
1557 Ensure that two buckets have the same keys, but get the lists through
1558 radosgw-admin rather than S3 so it can be used when radosgw isn't running.
1559 Only works for buckets of 10,000 objects since the tests calling it don't
1560 need more, and the output from bucket list doesn't have an obvious marker
1561 with which to continue.
1563 keys1
= get_obj_names(zone1
, bucket_name
, 10000)
1564 keys2
= get_obj_names(zone2
, bucket_name
, 10000)
1565 for key1
, key2
in zip_longest(keys1
, keys2
):
1567 log
.critical('key=%s is missing from zone=%s', key1
.name
,
1571 log
.critical('key=%s is missing from zone=%s', key2
.name
,
1575 @attr('bucket_reshard')
1576 def test_bucket_sync_run_basic_incremental():
1578 Create several generations of objects, then run bucket sync
1579 run to ensure they're all processed.
1581 zonegroup
= realm
.master_zonegroup()
1582 zonegroup_conns
= ZonegroupConns(zonegroup
)
1583 primary
= zonegroup_conns
.rw_zones
[0]
1585 # create a bucket write objects to it and wait for them to sync, ensuring
1586 # we are in incremental.
1587 bucket
= primary
.create_bucket(gen_bucket_name())
1588 log
.debug('created bucket=%s', bucket
.name
)
1589 zonegroup_meta_checkpoint(zonegroup
)
1590 write_most_shards(primary
, bucket
.name
, 11)
1591 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1594 # stop gateways in other zones so we can rely on bucket sync run
1595 for secondary
in zonegroup_conns
.rw_zones
[1:]:
1596 secondary
.zone
.stop()
1598 # build up multiple generations each with some objects written to
1600 generations
= [17, 19, 23, 29, 31, 37]
1601 for num_shards
in generations
:
1602 reshard_bucket(primary
.zone
, bucket
.name
, num_shards
)
1603 write_most_shards(primary
, bucket
.name
, num_shards
)
1605 # bucket sync run on every secondary
1606 for secondary
in zonegroup_conns
.rw_zones
[1:]:
1607 cmd
= ['bucket', 'sync', 'run'] + secondary
.zone
.zone_args()
1608 cmd
+= ['--bucket', bucket
.name
, '--source-zone', primary
.name
]
1609 secondary
.zone
.cluster
.admin(cmd
)
1611 bucket_keys_eq(primary
.zone
, secondary
.zone
, bucket
.name
)
1614 # Restart so bucket_checkpoint can actually fetch things from the
1615 # secondaries. Put this in a finally block so they restart even on
1617 for secondary
in zonegroup_conns
.rw_zones
[1:]:
1618 secondary
.zone
.start()
1620 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1622 def trash_bucket(zone
, bucket_name
):
1624 Remove objects through radosgw-admin, zapping bilog to prevent the deletes
1627 objs
= get_obj_names(zone
, bucket_name
, 10000)
1628 # Delete the objects
1630 cmd
= ['object', 'rm'] + zone
.zone_args()
1631 cmd
+= ['--bucket', bucket_name
]
1632 cmd
+= ['--object', obj
]
1633 zone
.cluster
.admin(cmd
)
1636 cmd
= ['bilog', 'trim'] + zone
.zone_args()
1637 cmd
+= ['--bucket', bucket_name
]
1638 zone
.cluster
.admin(cmd
)
1640 @attr('bucket_reshard')
1641 def test_zap_init_bucket_sync_run():
1643 Create several generations of objects, trash them, then run bucket sync init
1644 and bucket sync run.
1646 zonegroup
= realm
.master_zonegroup()
1647 zonegroup_conns
= ZonegroupConns(zonegroup
)
1648 primary
= zonegroup_conns
.rw_zones
[0]
1650 bucket
= primary
.create_bucket(gen_bucket_name())
1651 log
.debug('created bucket=%s', bucket
.name
)
1652 zonegroup_meta_checkpoint(zonegroup
)
1654 # Write zeroth generation
1655 for obj
in range(1, 6):
1656 k
= new_key(primary
, bucket
.name
, f
'obj{obj * 11}')
1657 k
.set_contents_from_string('foo')
1658 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1660 # Write several more generations
1661 generations
= [17, 19, 23, 29, 31, 37]
1662 for num_shards
in generations
:
1663 reshard_bucket(primary
.zone
, bucket
.name
, num_shards
)
1664 for obj
in range(1, 6):
1665 k
= new_key(primary
, bucket
.name
, f
'obj{obj * num_shards}')
1666 k
.set_contents_from_string('foo')
1667 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1670 # Stop gateways, trash bucket, init, sync, and restart for every secondary
1671 for secondary
in zonegroup_conns
.rw_zones
[1:]:
1673 secondary
.zone
.stop()
1675 trash_bucket(secondary
.zone
, bucket
.name
)
1677 cmd
= ['bucket', 'sync', 'init'] + secondary
.zone
.zone_args()
1678 cmd
+= ['--bucket', bucket
.name
]
1679 cmd
+= ['--source-zone', primary
.name
]
1680 secondary
.zone
.cluster
.admin(cmd
)
1682 cmd
= ['bucket', 'sync', 'run'] + secondary
.zone
.zone_args()
1683 cmd
+= ['--bucket', bucket
.name
, '--source-zone', primary
.name
]
1684 secondary
.zone
.cluster
.admin(cmd
)
1686 bucket_keys_eq(primary
.zone
, secondary
.zone
, bucket
.name
)
1689 # Do this as a finally so we bring the zone back up even on error.
1690 secondary
.zone
.start()
1692 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1694 def test_role_sync():
1695 zonegroup
= realm
.master_zonegroup()
1696 zonegroup_conns
= ZonegroupConns(zonegroup
)
1697 roles
, zone_role
= create_role_per_zone(zonegroup_conns
)
1699 zonegroup_meta_checkpoint(zonegroup
)
1701 for source_conn
, role
in zone_role
:
1702 for target_conn
in zonegroup_conns
.zones
:
1703 if source_conn
.zone
== target_conn
.zone
:
1706 check_role_eq(source_conn
, target_conn
, role
)
1708 @attr('data_sync_init')
1709 def test_bucket_full_sync_after_data_sync_init():
1710 zonegroup
= realm
.master_zonegroup()
1711 zonegroup_conns
= ZonegroupConns(zonegroup
)
1712 primary
= zonegroup_conns
.rw_zones
[0]
1713 secondary
= zonegroup_conns
.rw_zones
[1]
1715 bucket
= primary
.create_bucket(gen_bucket_name())
1716 log
.debug('created bucket=%s', bucket
.name
)
1717 zonegroup_meta_checkpoint(zonegroup
)
1720 # stop secondary zone before it starts a bucket full sync
1721 secondary
.zone
.stop()
1723 # write some objects that don't sync yet
1724 for obj
in range(1, 6):
1725 k
= new_key(primary
, bucket
.name
, f
'obj{obj * 11}')
1726 k
.set_contents_from_string('foo')
1728 cmd
= ['data', 'sync', 'init'] + secondary
.zone
.zone_args()
1729 cmd
+= ['--source-zone', primary
.name
]
1730 secondary
.zone
.cluster
.admin(cmd
)
1732 # Do this as a finally so we bring the zone back up even on error.
1733 secondary
.zone
.start()
1735 # expect all objects to replicate via 'bucket full sync'
1736 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1737 zonegroup_data_checkpoint(zonegroup_conns
)
1739 @attr('data_sync_init')
1740 @attr('bucket_reshard')
1741 def test_resharded_bucket_full_sync_after_data_sync_init():
1742 zonegroup
= realm
.master_zonegroup()
1743 zonegroup_conns
= ZonegroupConns(zonegroup
)
1744 primary
= zonegroup_conns
.rw_zones
[0]
1745 secondary
= zonegroup_conns
.rw_zones
[1]
1747 bucket
= primary
.create_bucket(gen_bucket_name())
1748 log
.debug('created bucket=%s', bucket
.name
)
1749 zonegroup_meta_checkpoint(zonegroup
)
1752 # stop secondary zone before it starts a bucket full sync
1753 secondary
.zone
.stop()
1755 # Write zeroth generation
1756 for obj
in range(1, 6):
1757 k
= new_key(primary
, bucket
.name
, f
'obj{obj * 11}')
1758 k
.set_contents_from_string('foo')
1760 # Write several more generations
1761 generations
= [17, 19, 23, 29, 31, 37]
1762 for num_shards
in generations
:
1763 reshard_bucket(primary
.zone
, bucket
.name
, num_shards
)
1764 for obj
in range(1, 6):
1765 k
= new_key(primary
, bucket
.name
, f
'obj{obj * num_shards}')
1766 k
.set_contents_from_string('foo')
1768 cmd
= ['data', 'sync', 'init'] + secondary
.zone
.zone_args()
1769 cmd
+= ['--source-zone', primary
.name
]
1770 secondary
.zone
.cluster
.admin(cmd
)
1772 # Do this as a finally so we bring the zone back up even on error.
1773 secondary
.zone
.start()
1775 # expect all objects to replicate via 'bucket full sync'
1776 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1777 zonegroup_data_checkpoint(zonegroup_conns
)
1779 @attr('data_sync_init')
1780 def test_bucket_incremental_sync_after_data_sync_init():
1781 zonegroup
= realm
.master_zonegroup()
1782 zonegroup_conns
= ZonegroupConns(zonegroup
)
1783 primary
= zonegroup_conns
.rw_zones
[0]
1784 secondary
= zonegroup_conns
.rw_zones
[1]
1786 bucket
= primary
.create_bucket(gen_bucket_name())
1787 log
.debug('created bucket=%s', bucket
.name
)
1788 zonegroup_meta_checkpoint(zonegroup
)
1790 # upload a dummy object and wait for sync. this forces each zone to finish
1791 # a full sync and switch to incremental
1792 k
= new_key(primary
, bucket
, 'dummy')
1793 k
.set_contents_from_string('foo')
1794 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1797 # stop secondary zone before it syncs the rest
1798 secondary
.zone
.stop()
1800 # Write more objects to primary
1801 for obj
in range(1, 6):
1802 k
= new_key(primary
, bucket
.name
, f
'obj{obj * 11}')
1803 k
.set_contents_from_string('foo')
1805 cmd
= ['data', 'sync', 'init'] + secondary
.zone
.zone_args()
1806 cmd
+= ['--source-zone', primary
.name
]
1807 secondary
.zone
.cluster
.admin(cmd
)
1809 # Do this as a finally so we bring the zone back up even on error.
1810 secondary
.zone
.start()
1812 # expect remaining objects to replicate via 'bucket incremental sync'
1813 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1814 zonegroup_data_checkpoint(zonegroup_conns
)
1816 @attr('data_sync_init')
1817 @attr('bucket_reshard')
1818 def test_resharded_bucket_incremental_sync_latest_after_data_sync_init():
1819 zonegroup
= realm
.master_zonegroup()
1820 zonegroup_conns
= ZonegroupConns(zonegroup
)
1821 primary
= zonegroup_conns
.rw_zones
[0]
1822 secondary
= zonegroup_conns
.rw_zones
[1]
1824 bucket
= primary
.create_bucket(gen_bucket_name())
1825 log
.debug('created bucket=%s', bucket
.name
)
1826 zonegroup_meta_checkpoint(zonegroup
)
1828 # Write zeroth generation to primary
1829 for obj
in range(1, 6):
1830 k
= new_key(primary
, bucket
.name
, f
'obj{obj * 11}')
1831 k
.set_contents_from_string('foo')
1833 # Write several more generations
1834 generations
= [17, 19, 23, 29, 31, 37]
1835 for num_shards
in generations
:
1836 reshard_bucket(primary
.zone
, bucket
.name
, num_shards
)
1837 for obj
in range(1, 6):
1838 k
= new_key(primary
, bucket
.name
, f
'obj{obj * num_shards}')
1839 k
.set_contents_from_string('foo')
1841 # wait for the secondary to catch up to the latest gen
1842 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1845 # stop secondary zone before it syncs the rest
1846 secondary
.zone
.stop()
1848 # write some more objects to the last gen
1849 for obj
in range(1, 6):
1850 k
= new_key(primary
, bucket
.name
, f
'obj{obj * generations[-1]}')
1851 k
.set_contents_from_string('foo')
1853 cmd
= ['data', 'sync', 'init'] + secondary
.zone
.zone_args()
1854 cmd
+= ['--source-zone', primary
.name
]
1855 secondary
.zone
.cluster
.admin(cmd
)
1857 # Do this as a finally so we bring the zone back up even on error.
1858 secondary
.zone
.start()
1860 # expect remaining objects in last gen to replicate via 'bucket incremental sync'
1861 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1862 zonegroup_data_checkpoint(zonegroup_conns
)
1864 @attr('data_sync_init')
1865 @attr('bucket_reshard')
1866 def test_resharded_bucket_incremental_sync_oldest_after_data_sync_init():
1867 zonegroup
= realm
.master_zonegroup()
1868 zonegroup_conns
= ZonegroupConns(zonegroup
)
1869 primary
= zonegroup_conns
.rw_zones
[0]
1870 secondary
= zonegroup_conns
.rw_zones
[1]
1872 bucket
= primary
.create_bucket(gen_bucket_name())
1873 log
.debug('created bucket=%s', bucket
.name
)
1874 zonegroup_meta_checkpoint(zonegroup
)
1876 # Write zeroth generation to primary
1877 for obj
in range(1, 6):
1878 k
= new_key(primary
, bucket
.name
, f
'obj{obj * 11}')
1879 k
.set_contents_from_string('foo')
1881 # wait for the secondary to catch up
1882 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1885 # stop secondary zone before it syncs later generations
1886 secondary
.zone
.stop()
1888 # Write several more generations
1889 generations
= [17, 19, 23, 29, 31, 37]
1890 for num_shards
in generations
:
1891 reshard_bucket(primary
.zone
, bucket
.name
, num_shards
)
1892 for obj
in range(1, 6):
1893 k
= new_key(primary
, bucket
.name
, f
'obj{obj * num_shards}')
1894 k
.set_contents_from_string('foo')
1896 cmd
= ['data', 'sync', 'init'] + secondary
.zone
.zone_args()
1897 cmd
+= ['--source-zone', primary
.name
]
1898 secondary
.zone
.cluster
.admin(cmd
)
1900 # Do this as a finally so we bring the zone back up even on error.
1901 secondary
.zone
.start()
1903 # expect all generations to replicate via 'bucket incremental sync'
1904 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket
.name
)
1905 zonegroup_data_checkpoint(zonegroup_conns
)
1907 def sync_info(cluster
, bucket
= None):
1908 cmd
= ['sync', 'info']
1910 cmd
+= ['--bucket', bucket
]
1911 (result_json
, retcode
) = cluster
.admin(cmd
)
1913 assert False, 'failed to get sync policy'
1915 return json
.loads(result_json
)
1917 def get_sync_policy(cluster
, bucket
= None):
1918 cmd
= ['sync', 'policy', 'get']
1920 cmd
+= ['--bucket', bucket
]
1921 (result_json
, retcode
) = cluster
.admin(cmd
)
1923 assert False, 'failed to get sync policy'
1925 return json
.loads(result_json
)
1927 def create_sync_policy_group(cluster
, group
, status
= "allowed", bucket
= None):
1928 cmd
= ['sync', 'group', 'create', '--group-id', group
, '--status' , status
]
1930 cmd
+= ['--bucket', bucket
]
1931 (result_json
, retcode
) = cluster
.admin(cmd
)
1933 assert False, 'failed to create sync policy group id=%s, bucket=%s' % (group
, bucket
)
1934 return json
.loads(result_json
)
1936 def set_sync_policy_group_status(cluster
, group
, status
, bucket
= None):
1937 cmd
= ['sync', 'group', 'modify', '--group-id', group
, '--status' , status
]
1939 cmd
+= ['--bucket', bucket
]
1940 (result_json
, retcode
) = cluster
.admin(cmd
)
1942 assert False, 'failed to set sync policy group id=%s, bucket=%s' % (group
, bucket
)
1943 return json
.loads(result_json
)
1945 def get_sync_policy_group(cluster
, group
, bucket
= None):
1946 cmd
= ['sync', 'group', 'get', '--group-id', group
]
1948 cmd
+= ['--bucket', bucket
]
1949 (result_json
, retcode
) = cluster
.admin(cmd
)
1951 assert False, 'failed to get sync policy group id=%s, bucket=%s' % (group
, bucket
)
1952 return json
.loads(result_json
)
1954 def remove_sync_policy_group(cluster
, group
, bucket
= None):
1955 cmd
= ['sync', 'group', 'remove', '--group-id', group
]
1957 cmd
+= ['--bucket', bucket
]
1958 (result_json
, retcode
) = cluster
.admin(cmd
)
1960 assert False, 'failed to remove sync policy group id=%s, bucket=%s' % (group
, bucket
)
1961 return json
.loads(result_json
)
1963 def create_sync_group_flow_symmetrical(cluster
, group
, flow_id
, zones
, bucket
= None):
1964 cmd
= ['sync', 'group', 'flow', 'create', '--group-id', group
, '--flow-id' , flow_id
, '--flow-type', 'symmetrical', '--zones=%s' % zones
]
1966 cmd
+= ['--bucket', bucket
]
1967 (result_json
, retcode
) = cluster
.admin(cmd
)
1969 assert False, 'failed to create sync group flow symmetrical groupid=%s, flow_id=%s, zones=%s, bucket=%s' % (group
, flow_id
, zones
, bucket
)
1970 return json
.loads(result_json
)
1972 def create_sync_group_flow_directional(cluster
, group
, flow_id
, src_zones
, dest_zones
, bucket
= None):
1973 cmd
= ['sync', 'group', 'flow', 'create', '--group-id', group
, '--flow-id' , flow_id
, '--flow-type', 'directional', '--source-zone=%s' % src_zones
, '--dest-zone=%s' % dest_zones
]
1975 cmd
+= ['--bucket', bucket
]
1976 (result_json
, retcode
) = cluster
.admin(cmd
)
1978 assert False, 'failed to create sync group flow directional groupid=%s, flow_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group
, flow_id
, src_zones
, dest_zones
, bucket
)
1979 return json
.loads(result_json
)
1981 def remove_sync_group_flow_symmetrical(cluster
, group
, flow_id
, zones
= None, bucket
= None):
1982 cmd
= ['sync', 'group', 'flow', 'remove', '--group-id', group
, '--flow-id' , flow_id
, '--flow-type', 'symmetrical']
1984 cmd
+= ['--zones=%s' % zones
]
1986 cmd
+= ['--bucket', bucket
]
1987 (result_json
, retcode
) = cluster
.admin(cmd
)
1989 assert False, 'failed to remove sync group flow symmetrical groupid=%s, flow_id=%s, zones=%s, bucket=%s' % (group
, flow_id
, zones
, bucket
)
1990 return json
.loads(result_json
)
1992 def remove_sync_group_flow_directional(cluster
, group
, flow_id
, src_zones
, dest_zones
, bucket
= None):
1993 cmd
= ['sync', 'group', 'flow', 'remove', '--group-id', group
, '--flow-id' , flow_id
, '--flow-type', 'directional', '--source-zone=%s' % src_zones
, '--dest-zone=%s' % dest_zones
]
1995 cmd
+= ['--bucket', bucket
]
1996 (result_json
, retcode
) = cluster
.admin(cmd
)
1998 assert False, 'failed to remove sync group flow directional groupid=%s, flow_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group
, flow_id
, src_zones
, dest_zones
, bucket
)
1999 return json
.loads(result_json
)
2001 def create_sync_group_pipe(cluster
, group
, pipe_id
, src_zones
, dest_zones
, bucket
= None, args
= []):
2002 cmd
= ['sync', 'group', 'pipe', 'create', '--group-id', group
, '--pipe-id' , pipe_id
, '--source-zones=%s' % src_zones
, '--dest-zones=%s' % dest_zones
]
2004 b_args
= '--bucket=' + bucket
2008 (result_json
, retcode
) = cluster
.admin(cmd
)
2010 assert False, 'failed to create sync group pipe groupid=%s, pipe_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group
, pipe_id
, src_zones
, dest_zones
, bucket
)
2011 return json
.loads(result_json
)
2013 def remove_sync_group_pipe(cluster
, group
, pipe_id
, bucket
= None, args
= None):
2014 cmd
= ['sync', 'group', 'pipe', 'remove', '--group-id', group
, '--pipe-id' , pipe_id
]
2016 b_args
= '--bucket=' + bucket
2020 (result_json
, retcode
) = cluster
.admin(cmd
)
2022 assert False, 'failed to remove sync group pipe groupid=%s, pipe_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group
, pipe_id
, src_zones
, dest_zones
, bucket
)
2023 return json
.loads(result_json
)
2025 def create_zone_bucket(zone
):
2026 b_name
= gen_bucket_name()
2027 log
.info('create bucket zone=%s name=%s', zone
.name
, b_name
)
2028 bucket
= zone
.create_bucket(b_name
)
2031 def create_object(zone_conn
, bucket
, objname
, content
):
2032 k
= new_key(zone_conn
, bucket
.name
, objname
)
2033 k
.set_contents_from_string(content
)
2035 def create_objects(zone_conn
, bucket
, obj_arr
, content
):
2036 for objname
in obj_arr
:
2037 create_object(zone_conn
, bucket
, objname
, content
)
2039 def check_object_exists(bucket
, objname
, content
= None):
2040 k
= bucket
.get_key(objname
)
2041 assert_not_equal(k
, None)
2042 if (content
!= None):
2043 assert_equal(k
.get_contents_as_string(encoding
='ascii'), content
)
2045 def check_objects_exist(bucket
, obj_arr
, content
= None):
2046 for objname
in obj_arr
:
2047 check_object_exists(bucket
, objname
, content
)
2049 def check_object_not_exists(bucket
, objname
):
2050 k
= bucket
.get_key(objname
)
2051 assert_equal(k
, None)
2053 def check_objects_not_exist(bucket
, obj_arr
):
2054 for objname
in obj_arr
:
2055 check_object_not_exists(bucket
, objname
)
2057 @attr('sync_policy')
2058 def test_sync_policy_config_zonegroup():
2060 test_sync_policy_config_zonegroup:
2061 test configuration of all sync commands
2063 zonegroup
= realm
.master_zonegroup()
2064 zonegroup_meta_checkpoint(zonegroup
)
2066 zonegroup_conns
= ZonegroupConns(zonegroup
)
2067 z1
, z2
= zonegroup
.zones
[0:2]
2068 c1
, c2
= (z1
.cluster
, z2
.cluster
)
2070 zones
= z1
.name
+","+z2
.name
2072 c1
.admin(['sync', 'policy', 'get'])
2074 # (a) zonegroup level
2075 create_sync_policy_group(c1
, "sync-group")
2076 set_sync_policy_group_status(c1
, "sync-group", "enabled")
2077 get_sync_policy_group(c1
, "sync-group")
2081 create_sync_group_flow_symmetrical(c1
, "sync-group", "sync-flow1", zones
)
2082 create_sync_group_flow_directional(c1
, "sync-group", "sync-flow2", z1
.name
, z2
.name
)
2084 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zones
, zones
)
2085 get_sync_policy_group(c1
, "sync-group")
2087 zonegroup
.period
.update(z1
, commit
=True)
2090 zc1
, zc2
= zonegroup_conns
.zones
[0:2]
2091 bucket
= create_zone_bucket(zc1
)
2092 bucket_name
= bucket
.name
2094 create_sync_policy_group(c1
, "sync-bucket", "allowed", bucket_name
)
2095 set_sync_policy_group_status(c1
, "sync-bucket", "enabled", bucket_name
)
2096 get_sync_policy_group(c1
, "sync-bucket", bucket_name
)
2098 get_sync_policy(c1
, bucket_name
)
2100 create_sync_group_flow_symmetrical(c1
, "sync-bucket", "sync-flow1", zones
, bucket_name
)
2101 create_sync_group_flow_directional(c1
, "sync-bucket", "sync-flow2", z1
.name
, z2
.name
, bucket_name
)
2103 create_sync_group_pipe(c1
, "sync-bucket", "sync-pipe", zones
, zones
, bucket_name
)
2104 get_sync_policy_group(c1
, "sync-bucket", bucket_name
)
2106 zonegroup_meta_checkpoint(zonegroup
)
2108 remove_sync_group_pipe(c1
, "sync-bucket", "sync-pipe", bucket_name
)
2109 remove_sync_group_flow_directional(c1
, "sync-bucket", "sync-flow2", z1
.name
, z2
.name
, bucket_name
)
2110 remove_sync_group_flow_symmetrical(c1
, "sync-bucket", "sync-flow1", zones
, bucket_name
)
2111 remove_sync_policy_group(c1
, "sync-bucket", bucket_name
)
2113 get_sync_policy(c1
, bucket_name
)
2115 zonegroup_meta_checkpoint(zonegroup
)
2117 remove_sync_group_pipe(c1
, "sync-group", "sync-pipe")
2118 remove_sync_group_flow_directional(c1
, "sync-group", "sync-flow2", z1
.name
, z2
.name
)
2119 remove_sync_group_flow_symmetrical(c1
, "sync-group", "sync-flow1")
2120 remove_sync_policy_group(c1
, "sync-group")
2124 zonegroup
.period
.update(z1
, commit
=True)
2128 @attr('sync_policy')
2129 def test_sync_flow_symmetrical_zonegroup_all():
2131 test_sync_flow_symmetrical_zonegroup_all:
2132 allows sync from all the zones to all other zones (default case)
2135 zonegroup
= realm
.master_zonegroup()
2136 zonegroup_meta_checkpoint(zonegroup
)
2138 zonegroup_conns
= ZonegroupConns(zonegroup
)
2140 (zoneA
, zoneB
) = zonegroup
.zones
[0:2]
2141 (zcA
, zcB
) = zonegroup_conns
.zones
[0:2]
2145 c1
.admin(['sync', 'policy', 'get'])
2147 zones
= zoneA
.name
+ ',' + zoneB
.name
2148 create_sync_policy_group(c1
, "sync-group")
2149 create_sync_group_flow_symmetrical(c1
, "sync-group", "sync-flow1", zones
)
2150 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zones
, zones
)
2151 set_sync_policy_group_status(c1
, "sync-group", "enabled")
2153 zonegroup
.period
.update(zoneA
, commit
=True)
2156 objnames
= [ 'obj1', 'obj2' ]
2160 # create bucket & object in all zones
2161 bucketA
= create_zone_bucket(zcA
)
2162 buckets
.append(bucketA
)
2163 create_object(zcA
, bucketA
, objnames
[0], content
)
2165 bucketB
= create_zone_bucket(zcB
)
2166 buckets
.append(bucketB
)
2167 create_object(zcB
, bucketB
, objnames
[1], content
)
2169 zonegroup_meta_checkpoint(zonegroup
)
2170 # 'zonegroup_data_checkpoint' currently fails for the zones not
2171 # allowed to sync. So as a workaround, data checkpoint is done
2172 # for only the ones configured.
2173 zone_data_checkpoint(zoneB
, zoneA
)
2175 # verify if objects are synced accross the zone
2176 bucket
= get_bucket(zcB
, bucketA
.name
)
2177 check_object_exists(bucket
, objnames
[0], content
)
2179 bucket
= get_bucket(zcA
, bucketB
.name
)
2180 check_object_exists(bucket
, objnames
[1], content
)
2182 remove_sync_policy_group(c1
, "sync-group")
2185 @attr('sync_policy')
2186 def test_sync_flow_symmetrical_zonegroup_select():
2188 test_sync_flow_symmetrical_zonegroup_select:
2189 allow sync between zoneA & zoneB
2190 verify zoneC doesnt sync the data
2193 zonegroup
= realm
.master_zonegroup()
2194 zonegroup_conns
= ZonegroupConns(zonegroup
)
2196 if len(zonegroup
.zones
) < 3:
2197 raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
2199 zonegroup_meta_checkpoint(zonegroup
)
2201 (zoneA
, zoneB
, zoneC
) = zonegroup
.zones
[0:3]
2202 (zcA
, zcB
, zcC
) = zonegroup_conns
.zones
[0:3]
2206 # configure sync policy
2207 zones
= zoneA
.name
+ ',' + zoneB
.name
2208 c1
.admin(['sync', 'policy', 'get'])
2209 create_sync_policy_group(c1
, "sync-group")
2210 create_sync_group_flow_symmetrical(c1
, "sync-group", "sync-flow", zones
)
2211 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zones
, zones
)
2212 set_sync_policy_group_status(c1
, "sync-group", "enabled")
2214 zonegroup
.period
.update(zoneA
, commit
=True)
2220 # create bucketA & objects in zoneA
2221 objnamesA
= [ 'obj1', 'obj2', 'obj3' ]
2222 bucketA
= create_zone_bucket(zcA
)
2223 buckets
.append(bucketA
)
2224 create_objects(zcA
, bucketA
, objnamesA
, content
)
2226 # create bucketB & objects in zoneB
2227 objnamesB
= [ 'obj4', 'obj5', 'obj6' ]
2228 bucketB
= create_zone_bucket(zcB
)
2229 buckets
.append(bucketB
)
2230 create_objects(zcB
, bucketB
, objnamesB
, content
)
2232 zonegroup_meta_checkpoint(zonegroup
)
2233 zone_data_checkpoint(zoneB
, zoneA
)
2234 zone_data_checkpoint(zoneA
, zoneB
)
2236 # verify if objnamesA synced to only zoneB but not zoneC
2237 bucket
= get_bucket(zcB
, bucketA
.name
)
2238 check_objects_exist(bucket
, objnamesA
, content
)
2240 bucket
= get_bucket(zcC
, bucketA
.name
)
2241 check_objects_not_exist(bucket
, objnamesA
)
2243 # verify if objnamesB synced to only zoneA but not zoneC
2244 bucket
= get_bucket(zcA
, bucketB
.name
)
2245 check_objects_exist(bucket
, objnamesB
, content
)
2247 bucket
= get_bucket(zcC
, bucketB
.name
)
2248 check_objects_not_exist(bucket
, objnamesB
)
2250 remove_sync_policy_group(c1
, "sync-group")
2253 @attr('sync_policy')
2254 def test_sync_flow_directional_zonegroup_select():
2256 test_sync_flow_directional_zonegroup_select:
2257 allow sync from only zoneA to zoneB
2259 verify that data doesn't get synced to zoneC and
2260 zoneA shouldn't sync data from zoneB either
2263 zonegroup
= realm
.master_zonegroup()
2264 zonegroup_conns
= ZonegroupConns(zonegroup
)
2266 if len(zonegroup
.zones
) < 3:
2267 raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
2269 zonegroup_meta_checkpoint(zonegroup
)
2271 (zoneA
, zoneB
, zoneC
) = zonegroup
.zones
[0:3]
2272 (zcA
, zcB
, zcC
) = zonegroup_conns
.zones
[0:3]
2276 # configure sync policy
2277 zones
= zoneA
.name
+ ',' + zoneB
.name
2278 c1
.admin(['sync', 'policy', 'get'])
2279 create_sync_policy_group(c1
, "sync-group")
2280 create_sync_group_flow_directional(c1
, "sync-group", "sync-flow", zoneA
.name
, zoneB
.name
)
2281 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zoneA
.name
, zoneB
.name
)
2282 set_sync_policy_group_status(c1
, "sync-group", "enabled")
2284 zonegroup
.period
.update(zoneA
, commit
=True)
2290 # create bucketA & objects in zoneA
2291 objnamesA
= [ 'obj1', 'obj2', 'obj3' ]
2292 bucketA
= create_zone_bucket(zcA
)
2293 buckets
.append(bucketA
)
2294 create_objects(zcA
, bucketA
, objnamesA
, content
)
2296 # create bucketB & objects in zoneB
2297 objnamesB
= [ 'obj4', 'obj5', 'obj6' ]
2298 bucketB
= create_zone_bucket(zcB
)
2299 buckets
.append(bucketB
)
2300 create_objects(zcB
, bucketB
, objnamesB
, content
)
2302 zonegroup_meta_checkpoint(zonegroup
)
2303 zone_data_checkpoint(zoneB
, zoneA
)
2305 # verify if objnamesA synced to only zoneB but not zoneC
2306 bucket
= get_bucket(zcB
, bucketA
.name
)
2307 check_objects_exist(bucket
, objnamesA
, content
)
2309 bucket
= get_bucket(zcC
, bucketA
.name
)
2310 check_objects_not_exist(bucket
, objnamesA
)
2312 # verify if objnamesB are not synced to either zoneA or zoneC
2313 bucket
= get_bucket(zcA
, bucketB
.name
)
2314 check_objects_not_exist(bucket
, objnamesB
)
2316 bucket
= get_bucket(zcC
, bucketB
.name
)
2317 check_objects_not_exist(bucket
, objnamesB
)
2320 verify the same at bucketA level
2321 configure another policy at bucketA level with src and dest
2322 zones specified to zoneA and zoneB resp.
2324 verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
2326 # reconfigure zonegroup pipe & flow
2327 remove_sync_group_pipe(c1
, "sync-group", "sync-pipe")
2328 remove_sync_group_flow_directional(c1
, "sync-group", "sync-flow", zoneA
.name
, zoneB
.name
)
2329 create_sync_group_flow_symmetrical(c1
, "sync-group", "sync-flow1", zones
)
2330 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zones
, zones
)
2332 # change state to allowed
2333 set_sync_policy_group_status(c1
, "sync-group", "allowed")
2335 zonegroup
.period
.update(zoneA
, commit
=True)
2338 # configure sync policy for only bucketA and enable it
2339 create_sync_policy_group(c1
, "sync-bucket", "allowed", bucketA
.name
)
2340 create_sync_group_flow_symmetrical(c1
, "sync-bucket", "sync-flowA", zones
, bucketA
.name
)
2341 args
= ['--source-bucket=*', '--dest-bucket=*']
2342 create_sync_group_pipe(c1
, "sync-bucket", "sync-pipe", zoneA
.name
, zoneB
.name
, bucketA
.name
, args
)
2343 set_sync_policy_group_status(c1
, "sync-bucket", "enabled", bucketA
.name
)
2345 get_sync_policy(c1
, bucketA
.name
)
2347 zonegroup_meta_checkpoint(zonegroup
)
2349 # create objects in bucketA in zoneA and zoneB
2350 objnamesC
= [ 'obj7', 'obj8', 'obj9' ]
2351 objnamesD
= [ 'obj10', 'obj11', 'obj12' ]
2352 create_objects(zcA
, bucketA
, objnamesC
, content
)
2353 create_objects(zcB
, bucketA
, objnamesD
, content
)
2355 zonegroup_meta_checkpoint(zonegroup
)
2356 zone_data_checkpoint(zoneB
, zoneA
)
2358 # verify that objnamesC are synced to bucketA in zoneB
2359 bucket
= get_bucket(zcB
, bucketA
.name
)
2360 check_objects_exist(bucket
, objnamesC
, content
)
2362 # verify that objnamesD are not synced to bucketA in zoneA
2363 bucket
= get_bucket(zcA
, bucketA
.name
)
2364 check_objects_not_exist(bucket
, objnamesD
)
2366 remove_sync_policy_group(c1
, "sync-bucket", bucketA
.name
)
2367 remove_sync_policy_group(c1
, "sync-group")
2370 @attr('sync_policy')
2371 def test_sync_single_bucket():
2373 test_sync_single_bucket:
2374 Allow data sync for only bucketA but not for other buckets via
2377 (a) zonegroup: symmetrical flow but configure pipe for only bucketA.
2378 (b) bucket level: configure policy for bucketA
2381 zonegroup
= realm
.master_zonegroup()
2382 zonegroup_meta_checkpoint(zonegroup
)
2384 zonegroup_conns
= ZonegroupConns(zonegroup
)
2386 (zoneA
, zoneB
) = zonegroup
.zones
[0:2]
2387 (zcA
, zcB
) = zonegroup_conns
.zones
[0:2]
2391 c1
.admin(['sync', 'policy', 'get'])
2393 zones
= zoneA
.name
+ ',' + zoneB
.name
2396 objnames
= [ 'obj1', 'obj2', 'obj3' ]
2400 # create bucketA & bucketB in zoneA
2401 bucketA
= create_zone_bucket(zcA
)
2402 buckets
.append(bucketA
)
2403 bucketB
= create_zone_bucket(zcA
)
2404 buckets
.append(bucketB
)
2406 zonegroup_meta_checkpoint(zonegroup
)
2409 Method (a): configure pipe for only bucketA
2411 # configure sync policy & pipe for only bucketA
2412 create_sync_policy_group(c1
, "sync-group")
2413 create_sync_group_flow_symmetrical(c1
, "sync-group", "sync-flow1", zones
)
2414 args
= ['--source-bucket=' + bucketA
.name
, '--dest-bucket=' + bucketA
.name
]
2416 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zones
, zones
, None, args
)
2417 set_sync_policy_group_status(c1
, "sync-group", "enabled")
2419 zonegroup
.period
.update(zoneA
, commit
=True)
2423 # create objects in bucketA & bucketB
2424 create_objects(zcA
, bucketA
, objnames
, content
)
2425 create_object(zcA
, bucketB
, objnames
, content
)
2427 zonegroup_meta_checkpoint(zonegroup
)
2428 zone_data_checkpoint(zoneB
, zoneA
)
2430 # verify if bucketA objects are synced
2431 bucket
= get_bucket(zcB
, bucketA
.name
)
2432 check_objects_exist(bucket
, objnames
, content
)
2434 # bucketB objects should not be synced
2435 bucket
= get_bucket(zcB
, bucketB
.name
)
2436 check_objects_not_exist(bucket
, objnames
)
2440 Method (b): configure policy at only bucketA level
2442 # reconfigure group pipe
2443 remove_sync_group_pipe(c1
, "sync-group", "sync-pipe")
2444 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zones
, zones
)
2446 # change state to allowed
2447 set_sync_policy_group_status(c1
, "sync-group", "allowed")
2449 zonegroup
.period
.update(zoneA
, commit
=True)
2453 # configure sync policy for only bucketA and enable it
2454 create_sync_policy_group(c1
, "sync-bucket", "allowed", bucketA
.name
)
2455 create_sync_group_flow_symmetrical(c1
, "sync-bucket", "sync-flowA", zones
, bucketA
.name
)
2456 create_sync_group_pipe(c1
, "sync-bucket", "sync-pipe", zones
, zones
, bucketA
.name
)
2457 set_sync_policy_group_status(c1
, "sync-bucket", "enabled", bucketA
.name
)
2459 get_sync_policy(c1
, bucketA
.name
)
2461 # create object in bucketA
2462 create_object(zcA
, bucketA
, objnames
[2], content
)
2464 # create object in bucketA too
2465 create_object(zcA
, bucketB
, objnames
[2], content
)
2467 zonegroup_meta_checkpoint(zonegroup
)
2468 zone_data_checkpoint(zoneB
, zoneA
)
2470 # verify if bucketA objects are synced
2471 bucket
= get_bucket(zcB
, bucketA
.name
)
2472 check_object_exists(bucket
, objnames
[2], content
)
2474 # bucketB objects should not be synced
2475 bucket
= get_bucket(zcB
, bucketB
.name
)
2476 check_object_not_exists(bucket
, objnames
[2])
2478 remove_sync_policy_group(c1
, "sync-bucket", bucketA
.name
)
2479 remove_sync_policy_group(c1
, "sync-group")
2482 @attr('sync_policy')
2483 def test_sync_different_buckets():
2485 test_sync_different_buckets:
2486 sync zoneA bucketA to zoneB bucketB via below methods
2488 (a) zonegroup: directional flow but configure pipe for zoneA bucketA to zoneB bucketB
2489 (b) bucket: configure another policy at bucketA level with pipe set to
2490 another bucket(bucketB) in target zone.
2492 sync zoneA bucketA from zoneB bucketB
2493 (c) configure another policy at bucketA level with pipe set from
2494 another bucket(bucketB) in source zone.
2498 zonegroup
= realm
.master_zonegroup()
2499 zonegroup_meta_checkpoint(zonegroup
)
2501 zonegroup_conns
= ZonegroupConns(zonegroup
)
2503 (zoneA
, zoneB
) = zonegroup
.zones
[0:2]
2504 (zcA
, zcB
) = zonegroup_conns
.zones
[0:2]
2505 zones
= zoneA
.name
+ ',' + zoneB
.name
2509 c1
.admin(['sync', 'policy', 'get'])
2511 objnames
= [ 'obj1', 'obj2' ]
2512 objnamesB
= [ 'obj3', 'obj4' ]
2516 # create bucketA & bucketB in zoneA
2517 bucketA
= create_zone_bucket(zcA
)
2518 buckets
.append(bucketA
)
2519 bucketB
= create_zone_bucket(zcA
)
2520 buckets
.append(bucketB
)
2522 zonegroup_meta_checkpoint(zonegroup
)
2525 Method (a): zonegroup - configure pipe for only bucketA
2527 # configure pipe from zoneA bucketA to zoneB bucketB
2528 create_sync_policy_group(c1
, "sync-group")
2529 create_sync_group_flow_symmetrical(c1
, "sync-group", "sync-flow1", zones
)
2530 args
= ['--source-bucket=' + bucketA
.name
, '--dest-bucket=' + bucketB
.name
]
2531 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zoneA
.name
, zoneB
.name
, None, args
)
2532 set_sync_policy_group_status(c1
, "sync-group", "enabled")
2533 zonegroup
.period
.update(zoneA
, commit
=True)
2536 # create objects in bucketA
2537 create_objects(zcA
, bucketA
, objnames
, content
)
2539 zonegroup_meta_checkpoint(zonegroup
)
2540 zone_data_checkpoint(zoneB
, zoneA
)
2542 # verify that objects are synced to bucketB in zoneB
2543 # but not to bucketA
2544 bucket
= get_bucket(zcB
, bucketA
.name
)
2545 check_objects_not_exist(bucket
, objnames
)
2547 bucket
= get_bucket(zcB
, bucketB
.name
)
2548 check_objects_exist(bucket
, objnames
, content
)
2550 Method (b): configure policy at only bucketA level with pipe
2551 set to bucketB in target zone
2554 remove_sync_group_pipe(c1
, "sync-group", "sync-pipe")
2555 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zones
, zones
)
2557 # change state to allowed
2558 set_sync_policy_group_status(c1
, "sync-group", "allowed")
2560 zonegroup
.period
.update(zoneA
, commit
=True)
2563 # configure sync policy for only bucketA and enable it
2564 create_sync_policy_group(c1
, "sync-bucket", "allowed", bucketA
.name
)
2565 create_sync_group_flow_symmetrical(c1
, "sync-bucket", "sync-flowA", zones
, bucketA
.name
)
2566 args
= ['--source-bucket=*', '--dest-bucket=' + bucketB
.name
]
2567 create_sync_group_pipe(c1
, "sync-bucket", "sync-pipeA", zones
, zones
, bucketA
.name
, args
)
2568 set_sync_policy_group_status(c1
, "sync-bucket", "enabled", bucketA
.name
)
2570 get_sync_policy(c1
, bucketA
.name
)
2572 objnamesC
= [ 'obj5', 'obj6' ]
2574 zonegroup_meta_checkpoint(zonegroup
)
2575 # create objects in bucketA
2576 create_objects(zcA
, bucketA
, objnamesC
, content
)
2578 zonegroup_meta_checkpoint(zonegroup
)
2579 zone_data_checkpoint(zoneB
, zoneA
)
2582 # verify that objects are synced to bucketB in zoneB
2583 # but not to bucketA
2585 bucket
= get_bucket(zcB
, bucketA
.name
)
2586 check_objects_not_exist(bucket
, objnamesC
)
2588 bucket
= get_bucket(zcB
, bucketB
.name
)
2589 check_objects_exist(bucket
, objnamesC
, content
)
2591 remove_sync_policy_group(c1
, "sync-bucket", bucketA
.name
)
2592 zonegroup_meta_checkpoint(zonegroup
)
2593 get_sync_policy(c1
, bucketA
.name
)
2596 Method (c): configure policy at only bucketA level with pipe
2597 set from bucketB in source zone
2598 verify zoneA bucketA syncs from zoneB BucketB but not bucketA
2601 # configure sync policy for only bucketA and enable it
2602 create_sync_policy_group(c1
, "sync-bucket", "allowed", bucketA
.name
)
2603 create_sync_group_flow_symmetrical(c1
, "sync-bucket", "sync-flowA", zones
, bucketA
.name
)
2604 args
= ['--source-bucket=' + bucketB
.name
, '--dest-bucket=' + '*']
2605 create_sync_group_pipe(c1
, "sync-bucket", "sync-pipe", zones
, zones
, bucketA
.name
, args
)
2606 set_sync_policy_group_status(c1
, "sync-bucket", "enabled", bucketA
.name
)
2608 get_sync_policy(c1
, bucketA
.name
)
2610 # create objects in bucketA & B in ZoneB
2611 objnamesD
= [ 'obj7', 'obj8' ]
2612 objnamesE
= [ 'obj9', 'obj10' ]
2614 create_objects(zcB
, bucketA
, objnamesD
, content
)
2615 create_objects(zcB
, bucketB
, objnamesE
, content
)
2617 zonegroup_meta_checkpoint(zonegroup
)
2618 zone_data_checkpoint(zoneA
, zoneB
)
2620 # verify that objects from only bucketB are synced to
2623 bucket
= get_bucket(zcA
, bucketA
.name
)
2624 check_objects_not_exist(bucket
, objnamesD
)
2625 check_objects_exist(bucket
, objnamesE
, content
)
2627 remove_sync_policy_group(c1
, "sync-bucket", bucketA
.name
)
2628 remove_sync_policy_group(c1
, "sync-group")
2631 @attr('sync_policy')
2632 def test_sync_multiple_buckets_to_single():
2634 test_sync_multiple_buckets_to_single:
2636 (a) pipe: sync zoneA bucketA,bucketB to zoneB bucketB
2638 (b) configure another policy at bucketA level with pipe configured
2639 to sync from multiple buckets (bucketA & bucketB)
2641 verify zoneA bucketA & bucketB syncs to zoneB BucketB
2644 zonegroup
= realm
.master_zonegroup()
2645 zonegroup_meta_checkpoint(zonegroup
)
2647 zonegroup_conns
= ZonegroupConns(zonegroup
)
2649 (zoneA
, zoneB
) = zonegroup
.zones
[0:2]
2650 (zcA
, zcB
) = zonegroup_conns
.zones
[0:2]
2651 zones
= zoneA
.name
+ ',' + zoneB
.name
2655 c1
.admin(['sync', 'policy', 'get'])
2657 objnamesA
= [ 'obj1', 'obj2' ]
2658 objnamesB
= [ 'obj3', 'obj4' ]
2662 # create bucketA & bucketB in zoneA
2663 bucketA
= create_zone_bucket(zcA
)
2664 buckets
.append(bucketA
)
2665 bucketB
= create_zone_bucket(zcA
)
2666 buckets
.append(bucketB
)
2668 zonegroup_meta_checkpoint(zonegroup
)
2670 # configure pipe from zoneA bucketA,bucketB to zoneB bucketB
2671 create_sync_policy_group(c1
, "sync-group")
2672 create_sync_group_flow_directional(c1
, "sync-group", "sync-flow", zoneA
.name
, zoneB
.name
)
2673 source_buckets
= [ bucketA
.name
, bucketB
.name
]
2674 for source_bucket
in source_buckets
:
2675 args
= ['--source-bucket=' + source_bucket
, '--dest-bucket=' + bucketB
.name
]
2676 create_sync_group_pipe(c1
, "sync-group", "sync-pipe-%s" % source_bucket
, zoneA
.name
, zoneB
.name
, None, args
)
2678 set_sync_policy_group_status(c1
, "sync-group", "enabled")
2679 zonegroup
.period
.update(zoneA
, commit
=True)
2682 # create objects in bucketA & bucketB
2683 create_objects(zcA
, bucketA
, objnamesA
, content
)
2684 create_objects(zcA
, bucketB
, objnamesB
, content
)
2686 zonegroup_meta_checkpoint(zonegroup
)
2687 zone_data_checkpoint(zoneB
, zoneA
)
2689 # verify that both zoneA bucketA & bucketB objects are synced to
2690 # bucketB in zoneB but not to bucketA
2691 bucket
= get_bucket(zcB
, bucketA
.name
)
2692 check_objects_not_exist(bucket
, objnamesA
)
2693 check_objects_not_exist(bucket
, objnamesB
)
2695 bucket
= get_bucket(zcB
, bucketB
.name
)
2696 check_objects_exist(bucket
, objnamesA
, content
)
2697 check_objects_exist(bucket
, objnamesB
, content
)
2700 Method (b): configure at bucket level
2702 # reconfigure pipe & flow
2703 for source_bucket
in source_buckets
:
2704 remove_sync_group_pipe(c1
, "sync-group", "sync-pipe-%s" % source_bucket
)
2705 remove_sync_group_flow_directional(c1
, "sync-group", "sync-flow", zoneA
.name
, zoneB
.name
)
2706 create_sync_group_flow_symmetrical(c1
, "sync-group", "sync-flow1", zones
)
2707 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zones
, zones
)
2709 # change state to allowed
2710 set_sync_policy_group_status(c1
, "sync-group", "allowed")
2712 zonegroup
.period
.update(zoneA
, commit
=True)
2715 objnamesC
= [ 'obj5', 'obj6' ]
2716 objnamesD
= [ 'obj7', 'obj8' ]
2718 # configure sync policy for only bucketA and enable it
2719 create_sync_policy_group(c1
, "sync-bucket", "allowed", bucketA
.name
)
2720 create_sync_group_flow_symmetrical(c1
, "sync-bucket", "sync-flowA", zones
, bucketA
.name
)
2721 source_buckets
= [ bucketA
.name
, bucketB
.name
]
2722 for source_bucket
in source_buckets
:
2723 args
= ['--source-bucket=' + source_bucket
, '--dest-bucket=' + '*']
2724 create_sync_group_pipe(c1
, "sync-bucket", "sync-pipe-%s" % source_bucket
, zoneA
.name
, zoneB
.name
, bucketA
.name
, args
)
2726 set_sync_policy_group_status(c1
, "sync-bucket", "enabled", bucketA
.name
)
2730 zonegroup_meta_checkpoint(zonegroup
)
2731 # create objects in bucketA
2732 create_objects(zcA
, bucketA
, objnamesC
, content
)
2733 create_objects(zcA
, bucketB
, objnamesD
, content
)
2735 zonegroup_meta_checkpoint(zonegroup
)
2736 zone_data_checkpoint(zoneB
, zoneA
)
2738 # verify that both zoneA bucketA & bucketB objects are synced to
2739 # bucketA in zoneB but not to bucketB
2740 bucket
= get_bucket(zcB
, bucketB
.name
)
2741 check_objects_not_exist(bucket
, objnamesC
)
2742 check_objects_not_exist(bucket
, objnamesD
)
2744 bucket
= get_bucket(zcB
, bucketA
.name
)
2745 check_objects_exist(bucket
, objnamesD
, content
)
2746 check_objects_exist(bucket
, objnamesD
, content
)
2748 remove_sync_policy_group(c1
, "sync-bucket", bucketA
.name
)
2749 remove_sync_policy_group(c1
, "sync-group")
2752 @attr('sync_policy')
2753 def test_sync_single_bucket_to_multiple():
2755 test_sync_single_bucket_to_multiple:
2757 (a) pipe: sync zoneA bucketA to zoneB bucketA & bucketB
2759 (b) configure another policy at bucketA level with pipe configured
2760 to sync to multiple buckets (bucketA & bucketB)
2762 verify zoneA bucketA syncs to zoneB bucketA & bucketB
2765 zonegroup
= realm
.master_zonegroup()
2766 zonegroup_meta_checkpoint(zonegroup
)
2768 zonegroup_conns
= ZonegroupConns(zonegroup
)
2770 (zoneA
, zoneB
) = zonegroup
.zones
[0:2]
2771 (zcA
, zcB
) = zonegroup_conns
.zones
[0:2]
2772 zones
= zoneA
.name
+ ',' + zoneB
.name
2776 c1
.admin(['sync', 'policy', 'get'])
2778 objnamesA
= [ 'obj1', 'obj2' ]
2782 # create bucketA & bucketB in zoneA
2783 bucketA
= create_zone_bucket(zcA
)
2784 buckets
.append(bucketA
)
2785 bucketB
= create_zone_bucket(zcA
)
2786 buckets
.append(bucketB
)
2788 zonegroup_meta_checkpoint(zonegroup
)
2790 # configure pipe from zoneA bucketA to zoneB bucketA, bucketB
2791 create_sync_policy_group(c1
, "sync-group")
2792 create_sync_group_flow_symmetrical(c1
, "sync-group", "sync-flow1", zones
)
2794 dest_buckets
= [ bucketA
.name
, bucketB
.name
]
2795 for dest_bucket
in dest_buckets
:
2796 args
= ['--source-bucket=' + bucketA
.name
, '--dest-bucket=' + dest_bucket
]
2797 create_sync_group_pipe(c1
, "sync-group", "sync-pipe-%s" % dest_bucket
, zoneA
.name
, zoneB
.name
, None, args
)
2799 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", zoneA
.name
, zoneB
.name
, None, args
)
2800 set_sync_policy_group_status(c1
, "sync-group", "enabled")
2801 zonegroup
.period
.update(zoneA
, commit
=True)
2804 # create objects in bucketA
2805 create_objects(zcA
, bucketA
, objnamesA
, content
)
2807 zonegroup_meta_checkpoint(zonegroup
)
2808 zone_data_checkpoint(zoneB
, zoneA
)
2810 # verify that objects from zoneA bucketA are synced to both
2811 # bucketA & bucketB in zoneB
2812 bucket
= get_bucket(zcB
, bucketA
.name
)
2813 check_objects_exist(bucket
, objnamesA
, content
)
2815 bucket
= get_bucket(zcB
, bucketB
.name
)
2816 check_objects_exist(bucket
, objnamesA
, content
)
2819 Method (b): configure at bucket level
2821 remove_sync_group_pipe(c1
, "sync-group", "sync-pipe")
2822 create_sync_group_pipe(c1
, "sync-group", "sync-pipe", '*', '*')
2824 # change state to allowed
2825 set_sync_policy_group_status(c1
, "sync-group", "allowed")
2827 zonegroup
.period
.update(zoneA
, commit
=True)
2830 objnamesB
= [ 'obj3', 'obj4' ]
2832 # configure sync policy for only bucketA and enable it
2833 create_sync_policy_group(c1
, "sync-bucket", "allowed", bucketA
.name
)
2834 create_sync_group_flow_symmetrical(c1
, "sync-bucket", "sync-flowA", zones
, bucketA
.name
)
2835 dest_buckets
= [ bucketA
.name
, bucketB
.name
]
2836 for dest_bucket
in dest_buckets
:
2837 args
= ['--source-bucket=' + '*', '--dest-bucket=' + dest_bucket
]
2838 create_sync_group_pipe(c1
, "sync-bucket", "sync-pipe-%s" % dest_bucket
, zoneA
.name
, zoneB
.name
, bucketA
.name
, args
)
2840 set_sync_policy_group_status(c1
, "sync-bucket", "enabled", bucketA
.name
)
2844 zonegroup_meta_checkpoint(zonegroup
)
2845 # create objects in bucketA
2846 create_objects(zcA
, bucketA
, objnamesB
, content
)
2848 zonegroup_meta_checkpoint(zonegroup
)
2849 zone_data_checkpoint(zoneB
, zoneA
)
2851 # verify that objects from zoneA bucketA are synced to both
2852 # bucketA & bucketB in zoneB
2853 bucket
= get_bucket(zcB
, bucketA
.name
)
2854 check_objects_exist(bucket
, objnamesB
, content
)
2856 bucket
= get_bucket(zcB
, bucketB
.name
)
2857 check_objects_exist(bucket
, objnamesB
, content
)
2859 remove_sync_policy_group(c1
, "sync-bucket", bucketA
.name
)
2860 remove_sync_policy_group(c1
, "sync-group")